Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/TS-3708: check term for role time #22199

Merged
merged 1 commit into from Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/common/tmsg.h
Expand Up @@ -1180,6 +1180,8 @@ typedef struct {
typedef struct {
int8_t syncState;
int8_t syncRestore;
int64_t syncTerm;
int64_t roleTimeMs;
} SMnodeLoad;

typedef struct {
Expand Down
42 changes: 22 additions & 20 deletions include/libs/sync/sync.h
Expand Up @@ -239,29 +239,31 @@ typedef struct SSyncState {
ESyncState state;
bool restored;
bool canRead;
SyncTerm term;
int64_t roleTimeMs;
} SSyncState;

int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid);
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);

SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
Expand Down
6 changes: 3 additions & 3 deletions source/common/src/systable.c
Expand Up @@ -33,7 +33,7 @@ static const SSysDbTableSchema dnodesSchema[] = {
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#ifdef TD_ENTERPRISE
{.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
Expand All @@ -47,7 +47,7 @@ static const SSysDbTableSchema mnodesSchema[] = {
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
};

static const SSysDbTableSchema modulesSchema[] = {
Expand All @@ -73,7 +73,7 @@ static const SSysDbTableSchema clusterSchema[] = {
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
};

Expand Down
9 changes: 9 additions & 0 deletions source/common/src/tmsg.c
Expand Up @@ -1101,6 +1101,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;

if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
tEndEncode(&encoder);

int32_t tlen = encoder.pos;
Expand Down Expand Up @@ -1183,6 +1185,13 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;

if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1;

pReq->mload.syncTerm = -1;
pReq->mload.roleTimeMs = 0;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->mload.syncTerm) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->mload.roleTimeMs) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/mnode/impl/inc/mndDef.h
Expand Up @@ -216,8 +216,9 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
ESyncState syncState;
SyncTerm syncTerm;
bool syncRestore;
int64_t stateStartTime;
int64_t roleTimeMs;
SDnodeObj* pDnode;
int32_t role;
SyncIndex lastIndex;
Expand Down
18 changes: 14 additions & 4 deletions source/dnode/mnode/impl/src/mndDnode.c
Expand Up @@ -524,13 +524,23 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {

SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) {
if (pObj->syncState != statusReq.mload.syncState || pObj->syncRestore != statusReq.mload.syncRestore) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d", pObj->id, syncStr(pObj->syncState),
syncStr(statusReq.mload.syncState), pObj->syncRestore, statusReq.mload.syncRestore);
bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
if (roleChanged || restoreChanged) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
" to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
pObj->syncState = statusReq.mload.syncState;
pObj->syncRestore = statusReq.mload.syncRestore;
pObj->stateStartTime = taosGetTimestampMs();
pObj->syncTerm = statusReq.mload.syncTerm;
}

if (roleChanged) {
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
}

mndReleaseMnode(pMnode, pObj);
}

Expand Down
5 changes: 4 additions & 1 deletion source/dnode/mnode/impl/src/mndMain.c
Expand Up @@ -890,7 +890,10 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;
mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
pLoad->syncTerm = state.term;
pLoad->roleTimeMs = state.roleTimeMs;
mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
return 0;
}

Expand Down
31 changes: 25 additions & 6 deletions source/dnode/mnode/impl/src/mndMnode.c
Expand Up @@ -319,7 +319,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
return 0;
}

static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
void *pReq = taosMemoryMalloc(contLen);
Expand Down Expand Up @@ -803,9 +803,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0;
int32_t cols = 0;
SMnodeObj *pObj = NULL;
SMnodeObj *pSelfObj = NULL;
ESdbStatus objStatus = 0;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
int64_t dummyTimeMs = 0;

pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
if (pSelfObj == NULL) {
mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
goto _out;
}

while (numOfRows < rows) {
pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
Expand All @@ -825,7 +833,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
if (pObj->id == pMnode->selfDnodeId) {
snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
}
if (mndIsDnodeOnline(pObj->pDnode, curMs)) {
bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
if (isDnodeOnline) {
tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
Expand All @@ -840,7 +849,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
const char *status = "ready";
if (objStatus == SDB_STATUS_CREATING) status = "creating";
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
if (!mndIsDnodeOnline(pObj->pDnode, curMs)) status = "offline";
if (!isDnodeOnline) status = "offline";
char b3[9 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
Expand All @@ -850,14 +859,24 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->stateStartTime, false);
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
// state of old term / no status report => use dummyTimeMs
if (pObj->syncTerm > pSelfObj->syncTerm) {
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
}

numOfRows++;
sdbRelease(pSdb, pObj);
}

pShow->numOfRows += numOfRows;

_out:
sdbRelease(pSdb, pSelfObj);
return numOfRows;
}

Expand Down Expand Up @@ -999,12 +1018,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
}

if (pMnode->syncMgmt.sync > 0) {
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex);

for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i];
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->clusterId, pNode->nodeRole);
}

Expand Down
2 changes: 1 addition & 1 deletion source/libs/sync/inc/syncInt.h
Expand Up @@ -213,7 +213,7 @@ typedef struct SSyncNode {
int64_t minMatchIndex;

int64_t startTime;
int64_t leaderTime;
int64_t roleTimeMs;
int64_t lastReplicateTime;

int32_t electNum;
Expand Down
11 changes: 8 additions & 3 deletions source/libs/sync/src/syncMain.c
Expand Up @@ -508,12 +508,14 @@ SSyncState syncGetState(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode != NULL) {
state.state = pSyncNode->state;
state.roleTimeMs = pSyncNode->roleTimeMs;
state.restored = pSyncNode->restoreFinish;
if (pSyncNode->vgId != 1) {
state.canRead = syncNodeIsReadyForRead(pSyncNode);
} else {
state.canRead = state.restored;
}
state.term = raftStoreGetTerm(pSyncNode);
syncNodeRelease(pSyncNode);
}

Expand Down Expand Up @@ -898,6 +900,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {

// init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
if (raftStoreOpen(pSyncNode) != 0) {
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
goto _error;
Expand Down Expand Up @@ -1035,7 +1038,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {

int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow;
pSyncNode->leaderTime = timeNow;
pSyncNode->lastReplicateTime = timeNow;

// snapshotting
Expand Down Expand Up @@ -1131,6 +1133,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode);

// reset elect timer, long enough
Expand Down Expand Up @@ -1667,6 +1670,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {

// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode);

// trace log
Expand Down Expand Up @@ -1695,6 +1699,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {

// state change
pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
pSyncNode->roleTimeMs = taosGetTimestampMs();

// trace log
sNTrace(pSyncNode, "become learner %s", debugStr);
Expand Down Expand Up @@ -1730,8 +1735,6 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs();

pSyncNode->becomeLeaderNum++;
pSyncNode->hbrSlowNum = 0;

Expand All @@ -1740,6 +1743,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {

// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->roleTimeMs = taosGetTimestampMs();

// set leader cache
pSyncNode->leaderCache = pSyncNode->myRaftId;
Expand Down Expand Up @@ -1839,6 +1843,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
pSyncNode->roleTimeMs = taosGetTimestampMs();
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
Expand Down