Skip to content

Commit

Permalink
Merge pull request #22000 from taosdata/mark/tmq
Browse files Browse the repository at this point in the history
fix:set commitOffset is send msg success & optimize log & add test ca…
  • Loading branch information
hjxilinx committed Jul 7, 2023
2 parents b7bf492 + 4436eb7 commit e921117
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 3 deletions.
4 changes: 3 additions & 1 deletion source/client/src/clientTmq.c
Expand Up @@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
}
// update the offset value.
pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
} else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
Expand Down Expand Up @@ -2712,7 +2714,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
// char offsetBuf[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset);
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);

pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end;
Expand Down
140 changes: 140 additions & 0 deletions source/client/test/clientTests.cpp
Expand Up @@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) {
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}

TEST(clientCase, td_25129) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");

TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);

tmq_conf_t* conf = tmq_conf_new();

tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");

tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);

// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");

// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);

TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 2000;

int32_t count = 0;

tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;

int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}

for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}

// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment(pAssign);

code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}

for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}

tmq_free_assignment(pAssign);

code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}

for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}

while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[128];

const char* topicName = tmq_get_topic_name(pRes);
// const char* dbName = tmq_get_db_name(pRes);
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
//
// printf("topic: %s\n", topicName);
// printf("db: %s\n", dbName);
// printf("vgroup id: %d\n", vgroupId);

printSubResults(pRes, &totalRows);
} else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
continue;
}

// tmq_commit_sync(tmq, pRes);
if (pRes != NULL) {
taos_free_result(pRes);
// if ((++count) > 1) {
// break;
// }
} else {
break;
}

// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}

tmq_free_assignment(pAssign);

code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}

for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}

tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}

TEST(clientCase, sub_tb_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");

Expand Down
4 changes: 2 additions & 2 deletions source/dnode/vnode/src/tq/tq.c
Expand Up @@ -490,7 +490,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (!exec) {
tqSetHandleExec(pHandle);
// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
break;
Expand Down Expand Up @@ -518,7 +518,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
tqSetHandleIdle(pHandle);

tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
req.subKey, pHandle);
return code;
}
Expand Down

0 comments on commit e921117

Please sign in to comment.