Skip to content

Commit

Permalink
Merge pull request #20314 from taosdata/FIX/TD-22983-main
Browse files Browse the repository at this point in the history
enh: keep extra raft Logs before minimum match index
  • Loading branch information
guanshengliang committed Mar 8, 2023
2 parents 4c69835 + eca97bf commit 7268009
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 10 deletions.
2 changes: 1 addition & 1 deletion source/libs/sync/inc/syncReplication.h
Expand Up @@ -51,7 +51,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);

int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);

int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
Expand Down
2 changes: 1 addition & 1 deletion source/libs/sync/src/syncMain.c
Expand Up @@ -301,7 +301,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
syncNodeRelease(pSyncNode);
return 0;
}
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex);
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
}

_DEL_WAL:
Expand Down
3 changes: 2 additions & 1 deletion source/libs/sync/src/syncPipeline.c
Expand Up @@ -742,7 +742,8 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p

if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1);
if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
Expand Down
9 changes: 9 additions & 0 deletions source/libs/sync/src/syncReplication.c
Expand Up @@ -48,6 +48,15 @@

int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);

int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
syncLogReplMgrReset(pMgr);
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
}

int32_t syncNodeReplicate(SSyncNode* pNode) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
Expand Down
10 changes: 3 additions & 7 deletions source/libs/sync/src/syncSnapshot.c
Expand Up @@ -992,8 +992,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
syncLogReplMgrReset(pMgr);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return 0;
}

Expand All @@ -1018,17 +1017,14 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
syncLogReplMgrReset(pMgr);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return -1;
}

return 0;

_ERROR:
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
syncLogReplMgrReset(pMgr);

syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return -1;
}

0 comments on commit 7268009

Please sign in to comment.