Skip to content

Commit

Permalink
Merge pull request #22106 from taosdata/feat/TD-22970
Browse files Browse the repository at this point in the history
feat/TD-22970
  • Loading branch information
gccgdb1234 committed Aug 22, 2023
2 parents aeb098c + 9f004c2 commit e3eb0f9
Show file tree
Hide file tree
Showing 31 changed files with 1,303 additions and 91 deletions.
11 changes: 8 additions & 3 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ static inline bool tmsgIsValid(tmsg_t type) {
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT) ||
(type == TDMT_SYNC_CONFIG_CHANGE);
}

static inline bool syncUtilUserCommit(tmsg_t msgType) {
Expand Down Expand Up @@ -1400,6 +1401,7 @@ typedef struct {
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
int32_t numOfCachedTables;
int32_t learnerProgress; // use one reservered
} SVnodeLoad;

typedef struct {
Expand Down Expand Up @@ -1541,6 +1543,7 @@ typedef struct {
int8_t learnerReplica;
int8_t learnerSelfIndex;
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
int32_t changeVersion;
} SCreateVnodeReq;

int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
Expand Down Expand Up @@ -1615,7 +1618,8 @@ typedef struct {
int8_t learnerSelfIndex;
int8_t learnerReplica;
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
} SAlterVnodeReplicaReq, SAlterVnodeTypeReq;
int32_t changeVersion;
} SAlterVnodeReplicaReq, SAlterVnodeTypeReq, SCheckLearnCatchupReq;

int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
Expand All @@ -1633,7 +1637,8 @@ typedef struct {
int32_t dstVgId;
uint32_t hashBegin;
uint32_t hashEnd;
int64_t reserved;
int32_t changeVersion;
int32_t reserved;
} SAlterVnodeHashRangeReq;

int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);
Expand Down
1 change: 1 addition & 0 deletions include/common/tmsgdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE_TYPE, "dnode-alter-mnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)

TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
Expand Down
23 changes: 14 additions & 9 deletions include/libs/sync/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ typedef struct SSyncCfg {
int32_t myIndex;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
SyncIndex lastIndex;
int32_t changeVersion;
} SSyncCfg;

typedef struct SFsmCbMeta {
Expand Down Expand Up @@ -239,20 +240,22 @@ typedef struct SSyncState {
ESyncState state;
bool restored;
bool canRead;
int32_t progress;
SyncTerm term;
int64_t roleTimeMs;
int64_t startTimeMs;
} SSyncState;

int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid);
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncCheckMember(int64_t rid);
int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
Expand All @@ -270,6 +273,8 @@ SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
const char* syncStr(ESyncState state);

int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg);

#ifdef __cplusplus
}
#endif
Expand Down
20 changes: 15 additions & 5 deletions source/common/src/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
if (tEncodeI32(&encoder, reserved) < 0) return -1;
if (tEncodeI32(&encoder, pload->learnerProgress) < 0) return -1;
if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1;
if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1;
}
Expand Down Expand Up @@ -1163,7 +1163,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0};
vload.syncTerm = -1;
int32_t reserved32 = 0;

if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1;
Expand All @@ -1175,7 +1175,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
if (tDecodeI32(&decoder, &vload.learnerProgress) < 0) return -1;
if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1;
if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
Expand Down Expand Up @@ -4348,6 +4348,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
SReplica *pReplica = &pReq->learnerReplicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;

tEndEncode(&encoder);

Expand Down Expand Up @@ -4434,6 +4435,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
}

tEndDecode(&decoder);
tDecoderClear(&decoder);
Expand Down Expand Up @@ -4666,6 +4670,7 @@ int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeRe
SReplica *pReplica = &pReq->learnerReplicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
tEndEncode(&encoder);

int32_t tlen = encoder.pos;
Expand Down Expand Up @@ -4697,6 +4702,9 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
}
if (!tDecodeIsEnd(&decoder)){
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
}

tEndDecode(&decoder);
tDecoderClear(&decoder);
Expand Down Expand Up @@ -4740,7 +4748,8 @@ int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnode
if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1;
if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1;
if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1;
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1;

tEndEncode(&encoder);

Expand All @@ -4758,7 +4767,8 @@ int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVno
if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1;

tEndDecode(&decoder);
tDecoderClear(&decoder);
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
Expand Down
1 change: 1 addition & 0 deletions source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);

// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
Expand Down
68 changes: 61 additions & 7 deletions source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->standby = 0;
pCfg->syncCfg.replicaNum = 0;
pCfg->syncCfg.totalReplicaNum = 0;
pCfg->syncCfg.changeVersion = pCreate->changeVersion;

memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int32_t i = 0; i < pCreate->replica; ++i) {
Expand Down Expand Up @@ -211,14 +212,15 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d",
"learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d",
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
(uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel,
req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod,
req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica,
req.learnerSelfIndex, req.strict);
req.learnerSelfIndex, req.strict, req.changeVersion);

for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
req.replicas[i].id);
Expand Down Expand Up @@ -323,6 +325,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return code;
}

//alter replica doesn't use this, but restore dnode still use this
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeTypeReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
Expand Down Expand Up @@ -363,8 +366,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);

int32_t vgId = req.vgId;
dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d", vgId, req.replica, req.selfIndex,
req.strict);
dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d",
vgId, req.replica, req.selfIndex, req.strict, req.changeVersion);
for (int32_t i = 0; i < req.replica; ++i) {
SReplica *pReplica = &req.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
Expand Down Expand Up @@ -424,7 +427,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
}

if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
return -1;
Expand All @@ -440,6 +443,53 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}

int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCheckLearnCatchupReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}

if(req.learnerReplicas == 0){
req.learnerSelfIndex = -1;
}

dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request",
req.vgId, TMSG_INFO(pMsg->msgType));

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}

ESyncRole role = vnodeGetRole(pVnode->pImpl);
dInfo("vgId:%d, checking node role:%d", req.vgId, role);
if(role == TAOS_SYNC_ROLE_VOTER){
dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}

dInfo("vgId:%d, checking node catch up", req.vgId);
if(vnodeIsCatchUp(pVnode->pImpl) != 1){
terrno = TSDB_CODE_VND_NOT_CATCH_UP;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}

dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);

vmReleaseVnode(pMgmt, pVnode);

dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request",
req.vgId, TMSG_INFO(pMsg->msgType));

return 0;
}

int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SDisableVnodeWriteReq req = {0};
if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
Expand Down Expand Up @@ -520,6 +570,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {

dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);

if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1;
Expand Down Expand Up @@ -559,9 +610,10 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vgId = alterReq.vgId;
dInfo(
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
"learnerSelfIndex:%d strict:%d changeVersion:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);

for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
Expand Down Expand Up @@ -755,6 +807,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
Expand Down
1 change: 1 addition & 0 deletions source/dnode/mgmt/mgmt_vnode/src/vmInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ static void *vmOpenVnodeInThread(void *param) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);

SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);

if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
pThread->failed++;
Expand Down
3 changes: 3 additions & 0 deletions source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_ALTER_VNODE_TYPE:
code = vmProcessAlterVnodeTypeReq(pMgmt, pMsg);
break;
case TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP:
code = vmProcessCheckLearnCatchupReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ typedef struct {
int64_t roleTimeMs;
int64_t startTimeMs;
ESyncRole nodeRole;
int32_t learnerProgress;
} SVnodeGid;

typedef struct {
Expand All @@ -376,6 +377,7 @@ typedef struct {
SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
void* pTsma;
int32_t numOfCachedTables;
int32_t syncConfChangeVer;
} SVgObj;

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mnode/impl/inc/mndVgroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
SArray *pArray);
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
STimeWindow tw);
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray);

void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
Expand Down

0 comments on commit e3eb0f9

Please sign in to comment.