/
PaxosAlgorithm.scala
238 lines (197 loc) · 7.8 KB
/
PaxosAlgorithm.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package com.github.trex_paxos.library
import scala.collection.immutable.SortedMap
/**
* A node in a paxos cluster
*
* @param nodeUniqueId The node unique ID used in the ballot numbers. Assumed to never be recycled.
* @param role The current role such as Follower or Leader
* @param data The current state of the node holding the paxos algorithm bookwork
* @param quorumStrategy The current quorum strategy (which could be any FPaxos flexible paxos strategy)
*/
case class PaxosAgent(nodeUniqueId: Int, role: PaxosRole, data: PaxosData, quorumStrategy: QuorumStrategy) {
def minPrepare: Prepare = Prepare(Identifier(nodeUniqueId, BallotNumber(0, 0), 0))
}
/**
* The latest event is an IO to read and write data (side effects), the paxos node, and a paxos message.
*
* @param io
* @param agent
* @param message
*/
case class PaxosEvent(io: PaxosIO, agent: PaxosAgent, message: PaxosMessage)
/**
* Paxos has side effects (writes to the network and read+write to disk) which are isolated into this class to simplify testing.
*/
trait PaxosIO {
/** The durable story to hold the state on disk.
*/
def journal: Journal
/**
* A logging adaptor.
*/
def logger: PaxosLogging
/**
* Randomised timeouts.
*/
def randomTimeout: Long
/**
* The current time (so that we can test timeout permutations and behaviours).
*/
def clock: Long
/**
* The callback to the host application which can side effect.
*
* @param payload The payload response to the client command value.
* @return
*/
def deliver(payload: Payload): Any
/**
* Send a paxos algorithm message within the cluster. May be deferred.
*/
def send(msg: PaxosMessage)
/**
* Associate a command value with a paxos identifier so that the result of the commit can be routed back to the sender of the command
*
* @param value The command to asssociate with a message identifer.
* @param id The message identifier
*/
def associate(value: CommandValue, id: Identifier): Unit
/**
* Respond to clients.
*
* @param results The results of a fast forward commit or None if leadership was lost such that the commit outcome is unknown.
*/
def respond(results: Option[scala.collection.immutable.Map[Identifier, Any]]): Unit
}
object PaxosAlgorithm {
type PaxosFunction = PartialFunction[PaxosEvent, PaxosAgent]
val minJournalBounds = JournalBounds(0, 0)
def initialAgent(nodeUniqueId: Int, progress: Progress, clusterSize: () => Int) =
new PaxosAgent(nodeUniqueId, Follower, PaxosData(progress, 0, 0,
SortedMap.empty[Identifier, scala.collection.immutable.Map[Int, PrepareResponse]](Ordering.IdentifierLogOrdering), None,
SortedMap.empty[Identifier, AcceptResponsesAndTimeout](Ordering.IdentifierLogOrdering)
), DefaultQuorumStrategy(clusterSize))
}
class PaxosAlgorithm extends PaxosLenses
with CommitHandler
with FollowerHandler
with RetransmitHandler
with PrepareHandler
with AcceptHandler
with PrepareResponseHandler
with AcceptResponseHandler
with ResendHandler
with ReturnToFollowerHandler
with ClientCommandHandler {
import PaxosAlgorithm._
val followingFunction: PaxosFunction = {
// update heartbeat and attempt to commit contiguous accept messages
case PaxosEvent(io, agent@PaxosAgent(_, Follower, _, _), c@Commit(i, heartbeat)) =>
handleFollowerCommit(io, agent, c)
case PaxosEvent(io, agent@PaxosAgent(_, Follower, PaxosData(_, _, to, _, _, _), _), CheckTimeout) if io.clock >= to =>
handleFollowerTimeout(io, agent)
case PaxosEvent(io, agent, vote: PrepareResponse) if agent.role == Follower =>
handleFollowerPrepareResponse(io, agent, vote)
// ignore an accept response which may be seen after we backdown to follower
case PaxosEvent(_, agent@PaxosAgent(_, Follower, _, _), vote: AcceptResponse) =>
agent
}
val retransmissionStateFunction: PaxosFunction = {
case PaxosEvent(io, agent, rq: RetransmitRequest) =>
handleRetransmitRequest(io, agent, rq)
case PaxosEvent(io, agent, rs: RetransmitResponse) =>
handleRetransmitResponse(io, agent, rs)
}
val prepareStateFunction: PaxosFunction = {
case PaxosEvent(io, agent, p@Prepare(id)) =>
handlePrepare(io, agent, p)
}
val acceptStateFunction: PaxosFunction = {
case PaxosEvent(io, agent, a: Accept) =>
handleAccept(io, agent, a)
}
val ignoreHeartbeatStateFunction: PaxosFunction = {
// ingore a HeartBeat which has not already been handled
case PaxosEvent(io, agent, HeartBeat) =>
agent
}
val unknown: PaxosFunction = {
case PaxosEvent(io, agent, x) =>
io.logger.warning("unknown message {}", x)
agent
}
/**
* If no other logic has caught a timeout then do nothing.
*/
val ignoreCheckTimeout: PaxosFunction = {
case PaxosEvent(_, agent, CheckTimeout) =>
agent
}
val lastFunction: PaxosFunction =
acceptStateFunction orElse
prepareStateFunction orElse
retransmissionStateFunction orElse
ignoreCheckTimeout orElse
unknown
val rejectCommandFunction: PaxosFunction = {
case PaxosEvent(io, agent, v: CommandValue) =>
io.send(NotLeader(agent.nodeUniqueId, v.msgUuid))
agent
}
val notLeaderFunction: PaxosFunction = ignoreHeartbeatStateFunction orElse rejectCommandFunction
val followerFunction: PaxosFunction = notLeaderFunction orElse followingFunction orElse lastFunction
val takeoverFunction: PaxosFunction = {
case PaxosEvent(io, agent, vote: PrepareResponse) =>
handlePrepareResponse(io, agent, vote)
}
val acceptResponseFunction: PaxosFunction = {
case PaxosEvent(io, agent, vote: AcceptResponse) =>
handleAcceptResponse(io, agent, vote)
}
/**
* Here on a timeout we deal with either pending prepares or pending accepts putting a priority on prepare handling
* which backs down easily. Only if we have dealt with all timed out prepares do we handle timed out accepts which
* is more aggressive as it attempts to go-higher than any other node number.
*/
val resendPreparesAndAcceptsFunction: PaxosFunction = {
// if we have timed-out on prepare messages
case PaxosEvent(io, agent, CheckTimeout) if agent.data.prepareResponses.nonEmpty && io.clock > agent.data.timeout =>
handleResendPrepares(io, agent, io.clock)
// if we have timed-out on accept messages
case PaxosEvent(io, agent, CheckTimeout) if agent.data.acceptResponses.nonEmpty && io.clock >= agent.data.timeout =>
handleResendAccepts(io, agent, io.clock)
}
val backDownOnHigherCommit: PaxosFunction = {
case PaxosEvent(io, agent, c: Commit) =>
handleReturnToFollowerOnHigherCommit(io, agent, c)
}
val recoveringFunction: PaxosFunction =
takeoverFunction orElse
acceptResponseFunction orElse
resendPreparesAndAcceptsFunction orElse
backDownOnHigherCommit
val recovererFunction: PaxosFunction = notLeaderFunction orElse recoveringFunction orElse lastFunction
val leadingFunction: PaxosFunction = {
// heartbeats the highest commit message
case PaxosEvent(io, agent, HeartBeat) =>
io.send(Commit(agent.data.progress.highestCommitted))
agent
// broadcasts a new client value
case PaxosEvent(io, agent, value: CommandValue) =>
handleClientCommand(io, agent, value)
// ignore late vote as we would have transitioned on a majority ack
case PaxosEvent(io, agent, value: PrepareResponse) =>
agent
}
val leaderFunction: PaxosFunction =
leadingFunction orElse
acceptResponseFunction orElse
resendPreparesAndAcceptsFunction orElse
backDownOnHigherCommit orElse
lastFunction
def apply(e: PaxosEvent): PaxosAgent = e.agent.role match {
case Follower => followerFunction(e)
case Recoverer => recovererFunction(e)
case Leader => leaderFunction(e)
}
}