-
Notifications
You must be signed in to change notification settings - Fork 1
/
peer.go
148 lines (130 loc) · 3.68 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package paxos
import (
"sort"
"github.com/LiuzhouChan/go-paxos/config"
"github.com/LiuzhouChan/go-paxos/internal/utils/stringutil"
"github.com/LiuzhouChan/go-paxos/paxospb"
)
// PeerAddress is the basic info for a peer in the paxos group.
type PeerAddress struct {
NodeID uint64
Address string
}
// Peer is the interface struct for interacting with the underlying Paxos
// protocol implementation.
type Peer struct {
i *instance
prevState paxospb.State
}
// LaunchPeer starts or restarts a Paxos node.
func LaunchPeer(config *config.Config, logdb ILogDB,
addresses []PeerAddress, initial bool, newNode bool) (*Peer, error) {
i := newInstance(config, logdb)
p := &Peer{i: i}
_, lastInstance := logdb.GetRange()
plog.Infof("LaunchPeer, lastInstanceID %d, initial %t, newNode %t",
lastInstance, initial, newNode)
// in the case of not realize the config change function, we all bootstrap the peers right now
// if initial && newNode {
bootstrap(i, addresses)
// }
if !newNode {
// after replay log, just set for newInstance
plog.Infof("it is not a new node, reset it for new instance")
i.resetForNewInstance()
}
if lastInstance == 0 {
p.prevState = emptyState
} else {
p.prevState = i.paxosState()
}
return p, nil
}
// Tick moves the logical clock forward by one tick.
func (p *Peer) Tick() {
p.i.Handle(paxospb.PaxosMsg{
MsgType: paxospb.LocalTick,
RejectByPromiseID: 0,
})
}
// Propose ... where we need to a new instance
func (p *Peer) Propose(key uint64, value []byte) {
p.i.Handle(paxospb.PaxosMsg{
MsgType: paxospb.Propose,
From: p.i.nodeID,
Key: key,
Value: stringutil.BytesDeepCopy(value),
})
}
// Handle ...
func (p *Peer) Handle(msg paxospb.PaxosMsg) {
_, rok := p.i.remotes[msg.From]
if rok {
p.i.Handle(msg)
}
}
// HasUpdate returns a boolean value indicating whether there is any Update
// ready to be processed.
func (p *Peer) HasUpdate(moreEntriesToApply bool) bool {
if pst := p.i.paxosState(); !paxospb.IsEmptyState(pst) &&
!paxospb.IsStateEqual(pst, p.prevState) {
// if it is not empty and not equal to pre
return true
}
if len(p.i.msgs) > 0 {
return true
}
if moreEntriesToApply && p.i.log.hasEntriesToApply() {
return true
}
return false
}
// Commit commits the Update state to mark it as processed.
func (p *Peer) Commit(ud paxospb.Update) {
p.i.msgs = nil
if !paxospb.IsEmptyState(ud.State) {
p.prevState = ud.State
}
p.i.log.commitUpdate(ud.UpdateCommit)
}
//NotifyPaxosLastApplied ...
func (p *Peer) NotifyPaxosLastApplied(lastApplied uint64) {
p.i.setApplied(lastApplied)
}
// GetUpdate returns the current state of the Peer.
func (p *Peer) GetUpdate(moreEntriesToApply bool) paxospb.Update {
return getUpdate(p.i, p.prevState, moreEntriesToApply)
}
func getUpdateCommit(ud paxospb.Update) paxospb.UpdateCommit {
var uc paxospb.UpdateCommit
if len(ud.CommittedEntries) > 0 {
uc.AppliedTo = ud.CommittedEntries[len(ud.CommittedEntries)-1].AcceptorState.InstanceID
}
return uc
}
func getUpdate(i *instance, ppst paxospb.State,
moreEntriesToApply bool) paxospb.Update {
ud := paxospb.Update{
GroupID: i.groupID,
NodeID: i.nodeID,
Messages: i.msgs,
EntriesToSave: i.log.entriesToSave(),
}
if moreEntriesToApply {
ud.CommittedEntries = i.log.getEntriesToApply()
}
if pst := i.paxosState(); !paxospb.IsStateEqual(pst, ppst) {
ud.State = pst
}
ud.UpdateCommit = getUpdateCommit((ud))
return ud
}
// TODO: only use for first start. later to add configchange type
func bootstrap(i *instance, addresses []PeerAddress) {
sort.Slice(addresses, func(i, j int) bool {
return addresses[i].NodeID < addresses[j].NodeID
})
for _, peer := range addresses {
i.addNode(peer.NodeID)
}
}