Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tmq): disable non-leader vnode responsing the poll request. #20490

Merged
merged 1 commit into from Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
187 changes: 96 additions & 91 deletions source/client/src/clientTmq.c
Expand Up @@ -24,6 +24,8 @@
#include "tref.h"
#include "ttimer.h"

#define EMPTY_BLOCK_POLL_IDLE_DURATION 100

struct SMqMgmt {
int8_t inited;
tmr_h timer;
Expand Down Expand Up @@ -68,18 +70,16 @@ struct tmq_conf_t {
};

struct tmq_t {
int64_t refId;
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;

int64_t refId;
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;
tmq_commit_cb* commitCb;
void* commitCbUserParam;

Expand All @@ -93,11 +93,10 @@ struct tmq_t {
int64_t pollCnt;

// timer
tmr_h hbLiveTimer;
tmr_h epTimer;
tmr_h reportTimer;
tmr_h commitTimer;

tmr_h hbLiveTimer;
tmr_h epTimer;
tmr_h reportTimer;
tmr_h commitTimer;
STscObj* pTscObj; // connection
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of rsp
Expand Down Expand Up @@ -149,6 +148,7 @@ typedef struct {
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
uint64_t reqId;
SEpSet* pEpset;
union {
SMqDataRsp dataRsp;
SMqMetaRsp metaRsp;
Expand Down Expand Up @@ -403,65 +403,42 @@ static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t
return NULL;
}

// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;

if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
taosThreadMutexLock(&pParam->pTmq->lock);
int32_t numOfVgroups, index;
SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);

if (pVg == NULL) {
tscDebug("consumer:0x%" PRIx64
" subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
} else { // let's retry the commit
int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
" retry failed, ignore this commit. code:%s ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
tstrerror(terrno), index + 1, numOfVgroups);
}
}

taosThreadMutexUnlock(&pParam->pTmq->lock);

taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);

tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
return 0;
}

// todo replace the pTmq with refId
taosThreadMutexLock(&pParam->pTmq->lock);
tmq_t* pTmq = pParam->pTmq;
int32_t index = 0, numOfVgroups = 0;

SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
if (pVg == NULL) {
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
} else { // update the epset if needed
if (pBuf->pEpSet != NULL) {
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));

tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d",
pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port,
index + 1, numOfVgroups);

pVg->epSet = *pBuf->pEpSet;
}

tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId,
pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
}

taosThreadMutexUnlock(&pParam->pTmq->lock);
// if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
// taosThreadMutexLock(&pParam->pTmq->lock);
// int32_t numOfVgroups, index;
// SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
// if (pVg == NULL) {
// tscDebug("consumer:0x%" PRIx64
// " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
// } else { // let's retry the commit
// int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
// if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
// " retry failed, ignore this commit. code:%s ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
// tstrerror(terrno), index + 1, numOfVgroups);
// }
// }
//
// taosThreadMutexUnlock(&pParam->pTmq->lock);
//
// taosMemoryFree(pParam->pOffset);
// taosMemoryFree(pBuf->pData);
// taosMemoryFree(pBuf->pEpSet);
//
// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// return 0;
// }
//
// // todo replace the pTmq with refId

taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
Expand Down Expand Up @@ -892,15 +869,21 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);

taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);

taosMemoryFree(pRsp->metaRsp.metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);

taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
Expand Down Expand Up @@ -1321,7 +1304,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;

pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
Expand Down Expand Up @@ -1350,7 +1335,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}

taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosWriteQitem(tmq->mqueue, pRspWrapper);

tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
Expand Down Expand Up @@ -1516,6 +1500,14 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (head->epoch <= epoch) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag);
tDeleteSMqAskEpRsp(&rsp);
}

goto END;
}

Expand Down Expand Up @@ -1702,13 +1694,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {

for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < 100) { // less than 100ms
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
pVg->vgId);
continue;
}

int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
Expand All @@ -1731,6 +1723,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
}

tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
return 0;
}

Expand Down Expand Up @@ -1782,31 +1775,43 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {

/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

if (pDataRsp->head.epoch == consumerEpoch) {
// todo fix it: race condition
SMqClientVg* pVg = pollRspWrapper->vgHandle;
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;

// update the epset
if (pollRspWrapper->pEpset != NULL) {
SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId,
pVg->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pVg->epSet = *pollRspWrapper->pEpset;
}

pVg->currentOffset = pDataRsp->rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);

if (pollRspWrapper->dataRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
pollRspWrapper->reqId);
char buf[80];
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
}
} else { // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pollRspWrapper->reqId);

// build rsp
char buf[80];
tFormatOffset(buf, 80, &pVg->currentOffset);
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);

taosFreeQitem(pollRspWrapper);
return pRsp;
taosFreeQitem(pollRspWrapper);
return pRsp;
}
} else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmq->consumerId, pDataRsp->head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
}
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mnode/impl/src/mndConsumer.c
Expand Up @@ -101,7 +101,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) {
mInfo("rebalance trans end, rebalance counter:%d", newVal);
mDebug("rebalance trans end, rebalance counter:%d", newVal);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/vnd/vnodeSvr.c
Expand Up @@ -510,7 +510,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
Expand Down