-
Notifications
You must be signed in to change notification settings - Fork 0
/
raft_appendentries.go
117 lines (105 loc) · 3.37 KB
/
raft_appendentries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package raft
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
ConflictTerm int
ConflictIndex int
}
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
DebugReceiveAppendEntries(rf, args)
defer DebugAfterReceiveAppendEntries(rf, args, reply)
// check args.Term and curTerm
if args.Term < rf.currentTerm {
reply.Success = false
reply.Term = rf.currentTerm
return
}
// if args.Term >= curTerm, can do the following things.
curLastLogIndex := rf.getLastIndex()
curFirstLogIndex := rf.getFirstIndex()
if args.PrevLogIndex < curFirstLogIndex {
// the prevLog is in the snapshot of this peer.
// this should not happend!
Debug(dError, "S%d, PrevlogIndex %d is in the snapshot! %d", rf.me, args.PrevLogIndex, curFirstLogIndex)
} else if args.PrevLogIndex > curLastLogIndex || rf.getTermForIndex(args.PrevLogIndex) != args.PrevLogTerm { // check prevIndex and prevTerm
// if prev doesn't match, return false immediately.
// no need to check commitIndex.
reply.Success = false
reply.Term = args.Term
// optimzed method from https://thesquareplanet.com/blog/students-guide-to-raft/
if args.PrevLogIndex > curLastLogIndex {
reply.ConflictIndex = rf.getLastIndex() + 1
reply.ConflictTerm = -1
} else {
reply.ConflictTerm = rf.getTermForIndex(args.PrevLogIndex)
findIdx := args.PrevLogIndex
// find the index of the log of conflictTerm
for i := args.PrevLogIndex; i > rf.getFirstIndex(); i-- {
if rf.getTermForIndex(i-1) != reply.ConflictTerm {
findIdx = i
break
}
}
reply.ConflictIndex = findIdx
}
} else {
// prev match!
reply.Success = true
reply.Term = args.Term
// check whether match all log in args
last_match_idx := args.PrevLogIndex
for i := 0; i < len(args.Entries); i++ {
if args.PrevLogIndex+1+i > curLastLogIndex {
break
}
if rf.getTermForIndex(args.PrevLogIndex+1+i) != args.Entries[i].Term {
break
}
last_match_idx = args.PrevLogIndex + 1 + i
}
if last_match_idx-args.PrevLogIndex != len(args.Entries) {
// partially match
rf.log = rf.log[0 : last_match_idx-rf.getFirstIndex()+1]
rf.log = append(rf.log, args.Entries[last_match_idx-args.PrevLogIndex:]...)
rf.persist()
}
// this must check, because may receive a out of data request with small commitIdx
old_commit_idx := rf.commitIdx
if args.LeaderCommit > rf.commitIdx {
rf.commitIdx = min(args.LeaderCommit, rf.getLastIndex())
}
if rf.commitIdx > old_commit_idx {
// nofity applier
for i := old_commit_idx + 1; i <= rf.commitIdx; i++ {
rf.commitQueue = append(rf.commitQueue, ApplyMsg{
CommandValid: true,
CommandIndex: i,
Command: rf.getCommand(i),
})
}
rf.cv.Broadcast()
}
}
// if args.Term bigger than rf.currTerm or term equal but this is not a follower,
// change self to follower
if args.Term > rf.currentTerm || rf.roler != FOLLOWER {
// change self to follower
rf.changeToFollower(args.Term, -1)
}
// reset vote expire time
rf.ResetElectionTimer()
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}