Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft 实现(MIT6.824 Lab) #33

Open
loadlj opened this issue Aug 6, 2020 · 0 comments
Open

Raft 实现(MIT6.824 Lab) #33

loadlj opened this issue Aug 6, 2020 · 0 comments

Comments

@loadlj
Copy link
Owner

loadlj commented Aug 6, 2020

Raft 实现(MIT6.824 Lab)

Background

本篇是基于MIT6.824课后实践实现的一个简单Raft,包括三个部分:Leader选举,日志复制,持久化。整篇的核心在raft paper里面的figure2中,这张图可以直接理解为编程语言了,是整个raft的核心。

Leader选举

状态机

Leader选举的概念状态机再重新贴一下:

先理清楚两个timeout:electionTimeout以及heartbeatTimeout。

  • 每个Follower节点初始化的时候都会随机设置一个electionTimeout,到达这个时间点就会变成Candidate发起投票请求。
  • Leader需要维持心跳包,每隔一段时间即heartbeat timeout就会发送一个heart beat。
    状态机的变化对照figure 2即可进行转化,这里我们只关注leader选举的部分。

Follower:

  • 响应candidate的投票请求和leader的心跳请求
  • 在electionTimeout时间内若没有收到心跳包或者投票请求则转化为candidate

Candidate:

  • 变成Candidate后需要的操作
    • 将自身的currentTerm自增
    • 给自己投一票
    • 重置electionTimeout
    • 给其他节点发送投票请求
  • 收到大多数响应就变成leader
  • 如果收到心跳包就变成follower
  • 如果electionTimeout超时,则再进行重试

Leader:

  • 每间隔heartbeatTimeout则发送心跳包

Raft 结构体:

type Raft struct {
	mu        sync.RWMutex        // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int
	votedFor    int
	logs        []LogEntry

	// Volatile state on all servers
	commitIndex int
	lastApplied int

	// Volatile state on leaders
	nextIndex  []int
	matchIndex []int

	voteCount     int
	state         uint64
	granted       chan struct{}
	AppendEntries chan struct{}
	electWin      chan struct{}
	applyCh       chan ApplyMsg
}

状态机里面的electionTimout设置为随机值是为了避免选举 split vote 情况。

func (rf *Raft) run() {
	for {
		switch rf.state {
		case StateCandidate:
			// vote
			select {
			// 超时,再重试一遍
			case <-time.After(time.Millisecond * time.Duration(rand.Intn(200)+300)):
				rf.mu.Lock()
				rf.becomeCandidate()
				rf.mu.Unlock()
			case <-rf.AppendEntries:
				rf.mu.Lock()
				rf.becomeFollower("candidate receive heart beat")
				rf.mu.Unlock()			       case <-rf.electWin:
				rf.mu.Lock()
				rf.becomeLeader()
				rf.mu.Unlock()
			}
		  case StateFollower:
			select {
			// 收到投票请求
			case <-rf.granted:
			// 收到心跳请求
			case <-rf.AppendEntries:
			case <-time.After(time.Millisecond * time.Duration(rand.Intn(200)+300)):
				rf.mu.Lock()
				rf.becomeCandidate()
				rf.mu.Unlock()
			}
		case StateLeader:
			go rf.sendAppendEntries()
			time.Sleep(time.Millisecond * 100)
		}
	}
}

func (rf *Raft) becomeLeader() {
	rf.debug("changed to Leader, id %d , term %d, logs %v", rf.me, rf.currentTerm, rf.logs)
	rf.state = StateLeader
	rf.nextIndex = make([]int, len(rf.peers))
	rf.matchIndex = make([]int, len(rf.peers))
	for i := range rf.peers {
		rf.nextIndex[i] = rf.getLastIndex() + 1
	}
}

func (rf *Raft)  becomeFollower(reason string) {
	rf.debug("changed to Follower, id %d, term %d, reason %s", rf.me, rf.currentTerm, reason)
	rf.state = StateFollower
}

func (rf *Raft) becomeCandidate() {
	rf.debug("changed to Candidate, id %d, term %d, logs %v", rf.me, rf.currentTerm, rf.logs)
	rf.state = StateCandidate
	rf.currentTerm++
	rf.votedFor = rf.me
	rf.voteCount = 1
	rf.persist()
	go rf.sendAllVotesRequests()
}

为了调式方便,写了一个带时间戳的Log,输出的日志看的比较方便

func (rf *Raft) debug(msg string, a ...interface{}) {
	if debug < 1 {
		return
	}
	selfMsg := fmt.Sprintf(" [me:%d term:%d, state: %d, log: %d] ", rf.me, rf.currentTerm, rf.state, len(rf.logs))
	fmt.Println(strconv.Itoa(int(time.Now().UnixNano())/1000) + selfMsg + fmt.Sprintf(msg, a...))
}

投票请求 RequestVote

RequestVote接口是发起投票用的RPC接口,只能由candidate发起。默认理解为每个Raft 集群的Node上都会这么一个接口用来接收candidate发起的投票请求。
Candidate给其他节点发送请求,处理response也在这个函数里面。

func (rf *Raft) sendAllVotesRequests() {
	rf.mu.Lock()
	// 投票的参数
	args := &RequestVoteArgs{}
	args.Term = rf.currentTerm
	args.CandidateId = rf.me
	// args.LastLogIndex = rf.getLastIndex()
	// args.LastLogTerm = rf.getLastTerm()
	rf.mu.Unlock()

	var wg sync.WaitGroup

	for p := range rf.peers {
		if p != rf.me {
			wg.Add(1)
			go func(p int) {
				defer wg.Done()

				ok := rf.sendRequestVote(p, args, &RequestVoteReply{})
				if !ok {
					rf.debug("send request to p: %d, ok: %v", p, ok)
				}

			}(p)
		}
	}
	wg.Wait()

	rf.mu.Lock()
	// 等待结果返回,如果符合quorum协议,则投票成功
	win := rf.voteCount >= len(rf.peers)/2+1
	// make sure the vote request is valid
	if win && args.Term == rf.currentTerm {
		rf.electWin <- struct{}{}
	}
	rf.debug("vote finished, voteCount: %d, win: %v", rf.voteCount, win)
	rf.mu.Unlock()

}

sendRequestVote里面加入了RPC的timeout,是因为测试用例里面会模拟网络不可达的情况,如果一个请求一直hang下去,系统会更加复杂。虽然paper上说的是可以无限重试,但是实际生产环境中外部RPC调用都是需要加上一个timeout来保护资源泄漏。

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	respCh := make(chan bool)
	ok := false
	go func() {
		respCh <- rf.peers[server].Call("Raft.RequestVote", args, reply)
	}()
	select {
	case <-time.After(time.Millisecond * 60): // 1s
		return false
	case ok = <-respCh:
	}
	if !ok {
		return false
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()

	if rf.state != StateCandidate || args.Term != rf.currentTerm {
		return ok
	}
	// 当前term较小了
	if reply.Term > rf.currentTerm {
		rf.becomeFollower("candidate received large term")
		rf.currentTerm = args.Term
		rf.votedFor = -1
	}

	if reply.VoteGranted {
		rf.voteCount++
	}
	return ok
}

心跳维持 AppendEntries

心跳包的维持是每隔一段时间(heartbeat timeout)去发送的,函数名为AppendEntries,因为log之后的数据每次同步也都是在这里面发送的。

func (rf *Raft) sendAppendEntries() {
	var wg sync.WaitGroup

	rf.mu.RLock()
	for p := range rf.peers {
		if p != rf.me {
			args := &RequestAppendEntriesArgs{}
			// 发送leader term
			args.Term = rf.currentTerm
			// leader ID
			args.LeaderID = rf.me
			// 日志同步需要用到的 leader 选举暂时用不到
			args.PrevLogIndex = rf.nextIndex[p] - 1
			args.LeaderCommit = rf.commitIndex
          
			if args.PrevLogIndex >= 0 {
				args.PrevLogTerm = rf.logs[args.PrevLogIndex].Term
			}
			// 发送空数据
			if rf.nextIndex[p] <= rf.getLastIndex() {
				args.Entries = rf.logs[rf.nextIndex[p]:]
			}
			//rf.debug("send Entries is: %v, index is: %d", args.Entries, p)
			wg.Add(1)

			go func(p int, args *RequestAppendEntriesArgs) {
				defer wg.Done()
				ok := rf.sendRequestAppendEntries(p, args, &RequestAppendEntriesReply{})
				if !ok {
					rf.debug("send %d AppendEntries result:%v", p, ok)
				}
			}(p, args)
		}
	}
	rf.mu.RUnlock()
	wg.Wait()
}

Leader在发送RPC请求的时候也需要带上一个timeout,这样方便控制整个流程。

func (rf *Raft) sendRequestAppendEntries(server int, args *RequestAppendEntriesArgs, reply *RequestAppendEntriesReply) bool {
	respCh := make(chan bool)
	ok := false
	go func() {
		respCh <- rf.peers[server].Call("Raft.RequestAppendEntries", args, reply)
	}()

	select {
	case <-time.After(time.Millisecond * 60): // 100ms
		return false
	case ok = <-respCh:
	}

	rf.mu.Lock()
	defer rf.mu.Unlock()
	if !ok || rf.state != StateLeader || args.Term != rf.currentTerm {
		return ok
	}
	if reply.Term > rf.currentTerm {
		rf.becomeFollower("leader expired")
		rf.currentTerm = reply.Term
		rf.persist()
		return ok
	}
	return ok

被调用方收到的请求处理流程

func (rf *Raft) RequestAppendEntries(args *RequestAppendEntriesArgs, reply *RequestAppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()

	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		return
	}
   // 发送channel表示收到心跳包,重置timeout 
	rf.AppendEntries <- struct{}{}
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		if rf.state != StateFollower {
			rf.becomeFollower("request append receive large term")
			rf.votedFor = -1
		}
	}
	reply.Success = true

日志复制

日志复制应该算是整个Lab里面最复杂的一部分,先简单回顾下paper内容。

当Leader被选出来后,就可以接受客户端发来的请求了,每个请求包含一条需要被replicated state machines执行的命令。leader会把它作为一个log entry append到日志中,然后给其它的server发AppendEntriesRPC请求。当Leader确定一个log entry被safely replicated了(大多数副本已经将该命令写入日志当中),就apply这条log entry到状态机中然后返回结果给客户端。如果某个Follower宕机了或者运行的很慢,或者网络丢包了,则会一直给这个Follower发AppendEntriesRPC直到日志一致。

当一条日志是commited时,Leader才可以将它应用到状态机中。Raft保证一条commited的log entry已经持久化了并且会被所有的节点执行。

因此,需要有一种机制来让leader和follower对log达成一致,leader会为每个follower维护一个nextIndex,表示leader给各个follower发送的下一条log entry在log中的index,初始化为leader的最后一条log entry的下一个位置。leader给follower发送AppendEntriesRPC消息,带着(term_id, (nextIndex-1)), term_id即(nextIndex-1)这个槽位的log entry的term_id,follower接收到AppendEntriesRPC后,会从自己的log中找是不是存在这样的log entry,如果不存在,就给leader回复拒绝消息,然后leader则将nextIndex减1,再重复,知道AppendEntriesRPC消息被接收。

初始化,nextIndex为11,leader给b发送AppendEntriesRPC(6,10),b在自己log的10号槽位中没有找到term_id为6的log entry。则给leader回应一个拒绝消息。接着,leader将nextIndex减一,变成10,然后给b发送AppendEntriesRPC(6, 9),b在自己log的9号槽位中同样没有找到term_id为6的log entry。循环下去,直到leader发送了AppendEntriesRPC(4,4),b在自己log的槽位4中找到了term_id为4的log entry。接收了消息。随后,leader就可以从槽位5开始给b推送日志了。

相较于leader选举,根据figure2可以知道会增加几个变量,先解释几个参数的意义:

type raft struct {
    logs        []LogEntry
    commitIndex int
    lastApplied int

    nextIndex  []int
    matchIndex []int
    applyCh       chan ApplyMsg
}
  • commitIndex 表示的是当前节点已经commit的位置
  • lastApplied 表示的是上次apply的位置
  • nextIndex 里面是一个数组,只有leader的nextIndex才有意义,表示的是希望与对应的peer下次同步日志的位置,初始化的时候是当前最长log的位置
  • matchIndex 是用来表示已经同步过log的位置,初始化的时候位置为0,这个也是只有leader才有意义
  • applyCh 在commit之后可以进行apply操作的channel

Log可以定义为[]LogEntry, 里面的command是Lab所需要的,这么一来Log的定义就完成了。

type LogEntry struct {
	Term    int
	Command interface{}
}

RequestVoteArgs 里面会新增LastLogIndex和LastLogTerm,用来判断当前leader是否是最新的。

type RequestVoteArgs struct {
	LastLogIndex int
	LastLogTerm  int
}

另外RequestAppendEntriesArgs里面也有所改变

type RequestAppendEntriesArgs struct {
	// Your data here (2A, 2B).
	PrevLogIndex int
	PrevLogTerm  int

	Entries      []LogEntry
}

PrevLogIndex对应的Leader中的nextIndex数组减去一,PrevLogTerm同理。

接受写请求 Start

这里唯一需要注意的一点就是Lab与raft paper不同,每次是直接在append之后就返回,没有等待其他Leader的append。

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	if rf.state != StateLeader {
		return 0, 0, false
	}

	index := rf.getLastIndex() + 1
	term := rf.currentTerm
	isLeader := true

	// Your code here (2B).
	// append to current logs
	rf.logs = append(rf.logs, LogEntry{term, command})
	rf.debug("receive start command, logs is :%v", rf.logs)
	return index, term, isLeader
}

Log发送 AppendEntries

相较于上次的Leader选举,新的AppendEntries会去同步日志,主要需要构建 PrevLogIndex以及Entries,Entries为空的话发送一个心跳包即可。

func (rf *Raft) sendAppendEntries() {
	var wg sync.WaitGroup

	rf.mu.RLock()
	for p := range rf.peers {
		if p != rf.me {
			args := &RequestAppendEntriesArgs{}
			args.Term = rf.currentTerm
			args.LeaderID = rf.me
			args.PrevLogIndex = rf.nextIndex[p] - 1
			args.LeaderCommit = rf.commitIndex

			if args.PrevLogIndex >= 0 {
				args.PrevLogTerm = rf.logs[args.PrevLogIndex].Term
			}
			// send empty data if index are same
			if rf.nextIndex[p] <= rf.getLastIndex() {
				args.Entries = rf.logs[rf.nextIndex[p]:]
			}
			//rf.debug("send Entries is: %v, index is: %d", args.Entries, p)
			wg.Add(1)

			go func(p int, args *RequestAppendEntriesArgs) {
				defer wg.Done()
				ok := rf.sendRequestAppendEntries(p, args, &RequestAppendEntriesReply{})
				if !ok {
					rf.debug("send %d AppendEntries result:%v", p, ok)
				}
			}(p, args)
		}
	}
	rf.mu.RUnlock()
	wg.Wait()
}

在看具体发送逻辑, 在每次成功响应后都会去提交日志, 更新Leader本地的rf.nextIndex以及rf.matchIndex。RetryIndex是用来优化的一个点,下个函数会讲到。

func (rf *Raft) sendRequestAppendEntries(server int, args *RequestAppendEntriesArgs, reply *RequestAppendEntriesReply) bool {
	respCh := make(chan bool)
	ok := false
	go func() {
		respCh <- rf.peers[server].Call("Raft.RequestAppendEntries", args, reply)
	}()

	select {
	case <-time.After(time.Millisecond * 60): // 100ms
		return false
	case ok = <-respCh:
	}

	rf.mu.Lock()
	defer rf.mu.Unlock()
	if !ok || rf.state != StateLeader || args.Term != rf.currentTerm {
		return ok
	}
	if reply.Term > rf.currentTerm {
		rf.becomeFollower("leader expired")
		rf.currentTerm = reply.Term
		rf.persist()
		return ok
	}
	//rf.debug("rf matchIndex is %v", rf.matchIndex)
	if reply.Success {

		rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
		//rf.debug("reply success, server is %d, matchIndex is %d", server, rf.matchIndex[server])
		rf.nextIndex[server] = rf.matchIndex[server] + 1

		go rf.commit()
	} else {
		rf.nextIndex[server] = reply.RetryIndex
	}

	return ok
}

Commit的逻辑也很简单,遍历peers,如果超过半数以上的matchIndex都等于当前Leader Log的结尾,则认为这是一次有效的Append,进行提交。

func (rf *Raft) commit() {
	majority := len(rf.peers)/2 + 1

	for i := rf.getLastIndex(); i > rf.commitIndex; i-- {
		count := 1
		if rf.logs[i].Term == rf.currentTerm {
			for j := range rf.peers {
				if j == rf.me {
					continue
				}
				// 当前的Leader的Log得到认可
				if rf.matchIndex[j] >= i {
					count++
				}
			}
		}
		
		if count >= majority {
			rf.commitIndex = i
			go rf.applyLog()
			break
		}
	}
}

func (rf *Raft) applyLog() {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	// apply changes
	for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
		msg := ApplyMsg{CommandIndex: i, Command: rf.logs[i].Command, CommandValid: true}
		rf.debug("send msg is: %v, lastApplied is %d, commitIndex is %d", msg, rf.lastApplied, rf.commitIndex)
		rf.applyCh <- msg
	}

	rf.lastApplied = rf.commitIndex
}

接受方的逻辑,使用retry index进行优化,当收到的request是有效之后,覆盖有冲突的Logs,直接从rf.logs[:args.PrevLogIndex+1]开始,然后进行提交。

func (rf *Raft) RequestAppendEntries(args *RequestAppendEntriesArgs, reply *RequestAppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()

	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		return
	}

	rf.AppendEntries <- struct{}{}
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		if rf.state != StateFollower {
			rf.becomeFollower("request append receive large term")
			rf.votedFor = -1
		}
	}

	// which means the request need to decrease the index and send request again
	if args.PrevLogIndex > rf.getLastIndex() {
		reply.RetryIndex = rf.getLastIndex() + 1
		return
	}
	// 这里使用retry index 其实是一个优化点
	// paper 里面是每次自减,回复一个false,这里直接找到下一个term的位置
	// 减少了心跳包的发送次数
	if args.PrevLogIndex > 0 && rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
		for reply.RetryIndex = args.PrevLogIndex - 1;
			reply.RetryIndex > 0 && rf.logs[reply.RetryIndex].Term == rf.logs[args.PrevLogIndex].Term;
		reply.RetryIndex-- {
		}
		return
	}
	rf.logs = append(rf.logs[:args.PrevLogIndex+1], args.Entries...)
	//rf.debug("args.LeaderCommit is :%d, PrevLogIndex %d, commitIndex: %d", args.LeaderCommit, args.PrevLogIndex, rf.commitIndex)
	if args.LeaderCommit > rf.commitIndex {
		rf.commitIndex = min(rf.getLastIndex(), args.LeaderCommit)
		go rf.applyLog()
	}
	reply.Success = true
}

持久化

根据Paper的内容,需要持久化的内容有三个:currentterm, votedFor, log[]

这就意味着每次当raft结构体内上诉三个变量发生改变的时候我们都需要将其持久化。persisth和readPersist都很简单。

func (rf *Raft) persist() {
	// Your code here (2C).
	// Example:
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.logs)
	data := w.Bytes()
	rf.persister.SaveRaftState(data)
}

//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	d.Decode(&rf.currentTerm)
	d.Decode(&rf.votedFor)
	d.Decode(&rf.logs)
}

至于Persist调用的地方只要完成了前面两个实现,添加也很简单,这里就不再贴代码了。

小结

在调测试的时候其实是很懵的,需要仔细看看测试代里面的内容,然后在调试的时候带上时间戳以及当前节点的信息,这样看起来就会容易许多。实现部分的代码其实没有多少,最精华的部分应该是这部分的测试代码,从模拟分区再到split over,再到节点的网络失效,有兴趣的可以仔细看下实现。

reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant