From b958030b0c97a567364b82a2cea25b652525fbd8 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 17 Apr 2023 18:52:09 +0800 Subject: [PATCH] enh: remove unused functions in sync --- source/libs/sync/inc/syncCommit.h | 2 -- source/libs/sync/inc/syncReplication.h | 1 - source/libs/sync/src/syncAppendEntries.c | 13 +---------- source/libs/sync/src/syncCommit.c | 29 ------------------------ source/libs/sync/src/syncRaftEntry.c | 9 +++++--- source/libs/sync/src/syncReplication.c | 16 ------------- 6 files changed, 7 insertions(+), 63 deletions(-) diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h index 7d638a7336e..07b4702b1b5 100644 --- a/source/libs/sync/inc/syncCommit.h +++ b/source/libs/sync/inc/syncCommit.h @@ -48,8 +48,6 @@ extern "C" { void syncOneReplicaAdvance(SSyncNode* pSyncNode); void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); -bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); -bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index a55fd7ead33..04456b2454c 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -55,7 +55,6 @@ int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId); int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode); int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 9ab545075cf..deae4b0b3fe 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -89,17 +89,6 @@ // /\ UNCHANGED <> // -SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) { - SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); - if (pEntry == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - (void)memcpy(pEntry, pMsg->data, pMsg->dataLen); - ASSERT(pEntry->bytes == pMsg->dataLen); - return pEntry; -} - int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; SRpcMsg rpcRsp = {0}; @@ -146,7 +135,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - pEntry = syncBuildRaftEntryFromAppendEntries(pMsg); + pEntry = syncEntryBuildFromAppendEntries(pMsg); if (pEntry == NULL) { sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr()); goto _IGNORE; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 2501b4df8b8..01f1f00c8b5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -44,22 +44,6 @@ // /\ UNCHANGED <> // -bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { - // I am leader, I agree - if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - return true; - } - - // follower agree - SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId); - if (matchIndex >= index) { - return true; - } - - // not agree - return false; -} - static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { ASSERT(a >= 0); ASSERT(b >= 0); @@ -85,19 +69,6 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) { return count >= pNode->quorum; } -bool syncAgree(SSyncNode* pNode, SyncIndex index) { - int agreeCount = 0; - for (int i = 0; i < pNode->replicaNum; ++i) { - if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) { - ++agreeCount; - } - if (agreeCount >= pNode->quorum) { - return true; - } - } - return false; -} - int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); commitIndex = TMAX(commitIndex, ths->commitIndex); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 3e63e2fb8ea..8f42780eb92 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -64,10 +64,13 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn } SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) { - SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen)); - if (pEntry == NULL) return NULL; - + SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); + if (pEntry == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memcpy(pEntry, pMsg->data, pMsg->dataLen); + ASSERT(pEntry->bytes == pMsg->dataLen); return pEntry; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 8ac9a860e3e..2776225a39d 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -46,8 +46,6 @@ // mdest |-> j]) // /\ UNCHANGED <> -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); - int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); @@ -86,20 +84,6 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI return 0; } -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { - int32_t ret = 0; - SyncAppendEntries* pMsg = pRpcMsg->pCont; - - if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { - ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg); - } else { - sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1); - rpcFreeCont(pRpcMsg->pCont); - } - - return ret; -} - int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { return syncNodeSendMsgById(destId, pSyncNode, pMsg); }