Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: remove unused functions in sync #20954

Merged
merged 1 commit into from Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions source/libs/sync/inc/syncCommit.h
Expand Up @@ -48,8 +48,6 @@ extern "C" {
void syncOneReplicaAdvance(SSyncNode* pSyncNode);
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);

bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);

int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex);
Expand Down
1 change: 0 additions & 1 deletion source/libs/sync/inc/syncReplication.h
Expand Up @@ -55,7 +55,6 @@ int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);

int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);

#ifdef __cplusplus
}
Expand Down
13 changes: 1 addition & 12 deletions source/libs/sync/src/syncAppendEntries.c
Expand Up @@ -89,17 +89,6 @@
// /\ UNCHANGED <<candidateVars, leaderVars>>
//

SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
if (pEntry == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
(void)memcpy(pEntry, pMsg->data, pMsg->dataLen);
ASSERT(pEntry->bytes == pMsg->dataLen);
return pEntry;
}

int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0};
Expand Down Expand Up @@ -146,7 +135,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}

pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
pEntry = syncEntryBuildFromAppendEntries(pMsg);
if (pEntry == NULL) {
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
goto _IGNORE;
Expand Down
29 changes: 0 additions & 29 deletions source/libs/sync/src/syncCommit.c
Expand Up @@ -44,22 +44,6 @@
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//

bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
// I am leader, I agree
if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
return true;
}

// follower agree
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
if (matchIndex >= index) {
return true;
}

// not agree
return false;
}

static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
ASSERT(a >= 0);
ASSERT(b >= 0);
Expand All @@ -85,19 +69,6 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
return count >= pNode->quorum;
}

bool syncAgree(SSyncNode* pNode, SyncIndex index) {
int agreeCount = 0;
for (int i = 0; i < pNode->replicaNum; ++i) {
if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) {
++agreeCount;
}
if (agreeCount >= pNode->quorum) {
return true;
}
}
return false;
}

int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMAX(commitIndex, ths->commitIndex);
Expand Down
9 changes: 6 additions & 3 deletions source/libs/sync/src/syncRaftEntry.c
Expand Up @@ -64,10 +64,13 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn
}

SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
if (pEntry == NULL) return NULL;

SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
if (pEntry == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memcpy(pEntry, pMsg->data, pMsg->dataLen);
ASSERT(pEntry->bytes == pMsg->dataLen);
return pEntry;
}

Expand Down
16 changes: 0 additions & 16 deletions source/libs/sync/src/syncReplication.c
Expand Up @@ -46,8 +46,6 @@
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>

int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);

int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
Expand Down Expand Up @@ -86,20 +84,6 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
return 0;
}

int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncAppendEntries* pMsg = pRpcMsg->pCont;

if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg);
} else {
sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1);
rpcFreeCont(pRpcMsg->pCont);
}

return ret;
}

int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
}
Expand Down