Skip to content

Commit

Permalink
Merge pull request #22199 from taosdata/fix/TS-3708
Browse files Browse the repository at this point in the history
fix/TS-3708: check term for role time
  • Loading branch information
gccgdb1234 committed Jul 27, 2023
2 parents d9fe1a3 + 167ecea commit cc3cc73
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 39 deletions.
2 changes: 2 additions & 0 deletions include/common/tmsg.h
Expand Up @@ -1180,6 +1180,8 @@ typedef struct {
typedef struct {
int8_t syncState;
int8_t syncRestore;
int64_t syncTerm;
int64_t roleTimeMs;
} SMnodeLoad;

typedef struct {
Expand Down
42 changes: 22 additions & 20 deletions include/libs/sync/sync.h
Expand Up @@ -239,29 +239,31 @@ typedef struct SSyncState {
ESyncState state;
bool restored;
bool canRead;
SyncTerm term;
int64_t roleTimeMs;
} SSyncState;

int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid);
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);

SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
Expand Down
6 changes: 3 additions & 3 deletions source/common/src/systable.c
Expand Up @@ -33,7 +33,7 @@ static const SSysDbTableSchema dnodesSchema[] = {
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#ifdef TD_ENTERPRISE
{.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
Expand All @@ -47,7 +47,7 @@ static const SSysDbTableSchema mnodesSchema[] = {
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
};

static const SSysDbTableSchema modulesSchema[] = {
Expand All @@ -73,7 +73,7 @@ static const SSysDbTableSchema clusterSchema[] = {
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
};

Expand Down
9 changes: 9 additions & 0 deletions source/common/src/tmsg.c
Expand Up @@ -1101,6 +1101,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;

if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
tEndEncode(&encoder);

int32_t tlen = encoder.pos;
Expand Down Expand Up @@ -1183,6 +1185,13 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;

if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1;

pReq->mload.syncTerm = -1;
pReq->mload.roleTimeMs = 0;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->mload.syncTerm) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->mload.roleTimeMs) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/mnode/impl/inc/mndDef.h
Expand Up @@ -216,8 +216,9 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
ESyncState syncState;
SyncTerm syncTerm;
bool syncRestore;
int64_t stateStartTime;
int64_t roleTimeMs;
SDnodeObj* pDnode;
int32_t role;
SyncIndex lastIndex;
Expand Down
18 changes: 14 additions & 4 deletions source/dnode/mnode/impl/src/mndDnode.c
Expand Up @@ -524,13 +524,23 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {

SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) {
if (pObj->syncState != statusReq.mload.syncState || pObj->syncRestore != statusReq.mload.syncRestore) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d", pObj->id, syncStr(pObj->syncState),
syncStr(statusReq.mload.syncState), pObj->syncRestore, statusReq.mload.syncRestore);
bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
if (roleChanged || restoreChanged) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
" to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
pObj->syncState = statusReq.mload.syncState;
pObj->syncRestore = statusReq.mload.syncRestore;
pObj->stateStartTime = taosGetTimestampMs();
pObj->syncTerm = statusReq.mload.syncTerm;
}

if (roleChanged) {
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
}

mndReleaseMnode(pMnode, pObj);
}

Expand Down
5 changes: 4 additions & 1 deletion source/dnode/mnode/impl/src/mndMain.c
Expand Up @@ -890,7 +890,10 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;
mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
pLoad->syncTerm = state.term;
pLoad->roleTimeMs = state.roleTimeMs;
mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
return 0;
}

Expand Down
31 changes: 25 additions & 6 deletions source/dnode/mnode/impl/src/mndMnode.c
Expand Up @@ -319,7 +319,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
return 0;
}

static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
void *pReq = taosMemoryMalloc(contLen);
Expand Down Expand Up @@ -803,9 +803,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0;
int32_t cols = 0;
SMnodeObj *pObj = NULL;
SMnodeObj *pSelfObj = NULL;
ESdbStatus objStatus = 0;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
int64_t dummyTimeMs = 0;

pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
if (pSelfObj == NULL) {
mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
goto _out;
}

while (numOfRows < rows) {
pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
Expand All @@ -825,7 +833,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
if (pObj->id == pMnode->selfDnodeId) {
snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
}
if (mndIsDnodeOnline(pObj->pDnode, curMs)) {
bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
if (isDnodeOnline) {
tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
Expand All @@ -840,7 +849,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
const char *status = "ready";
if (objStatus == SDB_STATUS_CREATING) status = "creating";
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
if (!mndIsDnodeOnline(pObj->pDnode, curMs)) status = "offline";
if (!isDnodeOnline) status = "offline";
char b3[9 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
Expand All @@ -850,14 +859,24 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->stateStartTime, false);
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
// state of old term / no status report => use dummyTimeMs
if (pObj->syncTerm > pSelfObj->syncTerm) {
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
}

numOfRows++;
sdbRelease(pSdb, pObj);
}

pShow->numOfRows += numOfRows;

_out:
sdbRelease(pSdb, pSelfObj);
return numOfRows;
}

Expand Down Expand Up @@ -999,12 +1018,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
}

if (pMnode->syncMgmt.sync > 0) {
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex);

for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->clusterId, pNode->nodeRole);
}

Expand Down
2 changes: 1 addition & 1 deletion source/libs/sync/inc/syncInt.h
Expand Up @@ -213,7 +213,7 @@ typedef struct SSyncNode {
int64_t minMatchIndex;

int64_t startTime;
int64_t leaderTime;
int64_t roleTimeMs;
int64_t lastReplicateTime;

int32_t electNum;
Expand Down
11 changes: 8 additions & 3 deletions source/libs/sync/src/syncMain.c
Expand Up @@ -508,12 +508,14 @@ SSyncState syncGetState(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode != NULL) {
state.state = pSyncNode->state;
state.roleTimeMs = pSyncNode->roleTimeMs;
state.restored = pSyncNode->restoreFinish;
if (pSyncNode->vgId != 1) {
state.canRead = syncNodeIsReadyForRead(pSyncNode);
} else {
state.canRead = state.restored;
}
state.term = raftStoreGetTerm(pSyncNode);
syncNodeRelease(pSyncNode);
}

Expand Down Expand Up @@ -898,6 +900,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {

// init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
if (raftStoreOpen(pSyncNode) != 0) {
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
goto _error;
Expand Down Expand Up @@ -1035,7 +1038,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {

int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow;
pSyncNode->leaderTime = timeNow;
pSyncNode->lastReplicateTime = timeNow;

// snapshotting
Expand Down Expand Up @@ -1131,6 +1133,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode);

// reset elect timer, long enough
Expand Down Expand Up @@ -1667,6 +1670,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {

// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode);

// trace log
Expand Down Expand Up @@ -1695,6 +1699,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {

// state change
pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
pSyncNode->roleTimeMs = taosGetTimestampMs();

// trace log
sNTrace(pSyncNode, "become learner %s", debugStr);
Expand Down Expand Up @@ -1730,8 +1735,6 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs();

pSyncNode->becomeLeaderNum++;
pSyncNode->hbrSlowNum = 0;

Expand All @@ -1740,6 +1743,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {

// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->roleTimeMs = taosGetTimestampMs();

// set leader cache
pSyncNode->leaderCache = pSyncNode->myRaftId;
Expand Down Expand Up @@ -1839,6 +1843,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
pSyncNode->roleTimeMs = taosGetTimestampMs();
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
Expand Down

0 comments on commit cc3cc73

Please sign in to comment.