-
Notifications
You must be signed in to change notification settings - Fork 0
/
RaftTypes.hs
283 lines (222 loc) · 8.07 KB
/
RaftTypes.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module RaftTypes where
import qualified Prelude (log)
import Prelude hiding (log)
import Data.Map (Map)
import qualified Data.Map as Map
import GHC.Generics
import Control.Lens
import Data.Aeson
import Control.Concurrent
import Control.Monad.State
import Control.Monad
import Data.Maybe
import Data.Text.Lazy.Encoding
import qualified Data.Text.Lazy as Text
import qualified Data.ByteString.Lazy as BS
newtype MessageId = MessageId Integer
deriving (Eq, Ord, Show, Num, Enum, Generic)
newtype Term = Term Integer
deriving (Eq, Ord, Show, Num, Enum, Generic)
newtype ServerId = ServerId Integer
deriving (Eq, Ord, Show, Num, Enum, Generic)
newtype LogIndex = LogIndex Integer
deriving (Eq, Ord, Show, Num, Enum, Generic)
type ServerMap a = Map ServerId a
data Role = Booting | Leader | Follower | Candidate (ServerMap Bool)
deriving Show
instance Eq Role where
Leader == Leader = True
Follower == Follower = True
(Candidate _) == (Candidate _) = True
_ == _ = False
type Hostname = String
type Port = Int
data LogEntry a = LogEntry {
_entryTerm :: Term,
_entryData :: a
} deriving (Eq, Show, Generic)
makeLenses ''LogEntry
type IndexedEntry a = (LogIndex, LogEntry a)
data Log a = Log {
_logEntries :: [LogEntry a]
} deriving (Eq, Show, Generic)
makeLenses ''Log
logMap :: ([LogEntry a] -> [LogEntry b]) -> Log a -> Log b
logMap f (Log as) = Log (f as)
entry :: LogIndex -> Log a -> Maybe (LogEntry a)
entry (LogIndex nth) (Log es) = es ^? element (fromIntegral nth - 1)
instance ToJSON Term
instance FromJSON Term
instance ToJSON ServerId
instance FromJSON ServerId
instance ToJSON LogIndex
instance FromJSON LogIndex
instance ToJSON a => ToJSON (LogEntry a)
instance FromJSON a => FromJSON (LogEntry a)
instance ToJSON a => ToJSON (Log a)
instance FromJSON a => FromJSON (Log a)
data NilEntry = NilEntry
instance FromJSON NilEntry where
parseJSON _ = return NilEntry
--- Message types
data MessageType = AppendEntries | AppendEntriesResponse
| RequestVote | RequestVoteResponse
deriving (Show, Eq, Generic)
isRequest :: MessageType -> Bool
isRequest AppendEntries = True
isRequest RequestVote = True
isRequest _ = False
isResponse :: MessageType -> Bool
isResponse AppendEntriesResponse = True
isResponse RequestVoteResponse = True
isResponse _ = False
data MessageInfo = MessageInfo {
_msgFrom :: ServerId,
_msgId :: MessageId
} deriving (Show, Generic)
makeLenses ''MessageInfo
newtype EncodedArg = EncodedArg BS.ByteString
deriving Show
rawArg :: EncodedArg -> BS.ByteString
rawArg (EncodedArg bs) = bs
data Message = Message {
_msgType :: MessageType,
_msgArgs :: [(String, EncodedArg)],
_msgInfo :: MessageInfo
} deriving (Show, Generic)
makeLenses ''Message
info :: Lens' Message MessageInfo
info = msgInfo
type BaseMessage = MessageInfo -> Message
type PendingMessage c = (c, Message)
instance ToJSON EncodedArg where
toJSON (EncodedArg bs) = Data.Aeson.String . Text.toStrict . decodeUtf8 $ bs
instance FromJSON EncodedArg where
parseJSON (Data.Aeson.String txt) = pure . EncodedArg . encodeUtf8 . Text.fromStrict $ txt
instance ToJSON MessageType
instance FromJSON MessageType
instance ToJSON MessageId
instance FromJSON MessageId
instance ToJSON MessageInfo
instance FromJSON MessageInfo
instance ToJSON Message
instance FromJSON Message
--- Config types
data ClientConfig = ClientConfig {
_clientHostname :: Hostname,
_clientPort :: Port
} deriving (Eq, Show, Generic)
makeLenses ''ClientConfig
data CohortConfig = CohortConfig {
_cohortId :: ServerId,
_cohortHostname :: Hostname,
_cohortPort :: Port
} deriving (Eq, Show, Generic)
makeLenses ''CohortConfig
data ClusterConfig = ClusterConfig {
_clientConfig :: ClientConfig,
_clusterServers :: [CohortConfig]
} deriving (Eq, Show, Generic)
makeLenses ''ClusterConfig
--- Storage types
type PersistentState a = (Term, Maybe ServerId, Log a)
class Persist s where
writeToStable :: ToJSON a => PersistentState a -> s a -> IO ()
readFromStable :: FromJSON a => s a -> IO (PersistentState a)
fromName :: String -> s a
--- Connection types
data OwnFollower = OwnFollower {
_of_msgQueue :: MVar [Message],
_of_queueNotEmpty :: MVar ()
}
{-# NOINLINE newOwnFollower #-}
newOwnFollower :: IO OwnFollower
newOwnFollower = OwnFollower <$> newMVar [] <*> newEmptyMVar
data SelfConnection a = SelfConnection {
_sc_server :: MVar a,
_sc_msgQueue :: MVar [Message],
_sc_queueNotEmpty :: MVar ()
}
selfConnection :: MVar a -> OwnFollower -> SelfConnection a
selfConnection self (OwnFollower q qFlag) = SelfConnection self q qFlag
--- Server type
data ServerConfig cl s c a = ServerConfig {
_role :: Role,
_ownCohort :: CohortConfig,
_ownFollower :: Maybe OwnFollower,
_cohorts :: ServerMap c,
_storage :: s a,
_client :: cl a
}
makeLenses ''ServerConfig
data Server cl s c a = Server {
--- Raft State
-- Follower state
_currentTerm :: Term,
_votedFor :: Maybe ServerId,
_log :: Log a,
_commitIndex :: LogIndex,
_lastApplied :: LogIndex,
-- Leader-only state
_nextIndex :: Maybe (ServerMap LogIndex),
_matchIndex :: Maybe (ServerMap LogIndex),
--- Non-raft state
_config :: ServerConfig cl s c a,
_outstanding :: Map MessageId Message
}
makeLenses ''Server
serverId :: Lens' (Server cl s c a) ServerId
serverId = config.ownCohort.cohortId
instance Show (ServerConfig cl s c a) where
show conf = "ServerConfig (" ++ show (view role conf) ++ ") (" ++ show (view ownCohort conf) ++ ")"
instance (Show a) => Show (Server cl s c a) where
show s = "=== Server " ++ show (view serverId s) ++ " state ===" ++ "\n"
++ "currentTerm: " ++ show (view currentTerm s) ++ "\n"
++ "votedFor: " ++ show (view votedFor s) ++ "\n"
++ "log: " ++ show (view log s) ++ "\n"
++ "commitIndex: " ++ show (view commitIndex s) ++ "\n"
++ "lastApplied: " ++ show (view lastApplied s) ++ "\n"
++ "nextIndex: " ++ showM (view nextIndex s) ++ "\n"
++ "matchIndex: " ++ showM (view matchIndex s) ++ "\n"
++ "=== end ==="
where showM Nothing = "___"
showM (Just x) = show x
type Raft cl s c a v = State (Server cl s c a) v
--- Accessors and helpers
-- State accessors
lastIndex :: Log a -> LogIndex
lastIndex = LogIndex . fromIntegral . length . view logEntries
viewLastLogIndex :: Server cl s c a -> LogIndex
viewLastLogIndex = lastIndex . view log
lastTerm :: Log a -> Term
lastTerm (Log []) = 0
lastTerm (Log es) = view entryTerm (last es)
viewLastLogTerm :: Server cl s c a -> Term
viewLastLogTerm = lastTerm . view log
withIndices :: Log a -> [(LogIndex, LogEntry a)]
withIndices = zip [1..] . view logEntries
logWithIndices :: Server cl s c a -> [(LogIndex, LogEntry a)]
logWithIndices = withIndices . view log
serverCohorts :: Server cl s c a -> [c]
serverCohorts = map snd . Map.toList . view (config.cohorts)
otherCohortIds :: Server cl s c a -> [ServerId]
otherCohortIds = map fst . Map.toList . view (config.cohorts)
termAtIndex :: LogIndex -> Server cl s c a -> Maybe Term
termAtIndex 0 _ = Just 0
termAtIndex i s = entry i (view log s) >>= Just . view entryTerm
-- Storage helpers
defaultPersistentState :: PersistentState e
defaultPersistentState = (0, Nothing, Log [])
injectPersistentState :: PersistentState a -> Server cl s c a -> Server cl s c a
injectPersistentState (t, v, l) serv = set currentTerm t . set votedFor v . set log l $ serv
extractPersistentState :: Server cl s c a -> PersistentState a
extractPersistentState serv = (view currentTerm serv, view votedFor serv, view log serv)
persist :: (Persist s, ToJSON a) => Server cl s c a -> IO ()
persist serv = writeToStable (extractPersistentState serv) $ view (config.storage) serv
fromPersist :: (Persist s, FromJSON a) => Server cl s c a -> IO (Server cl s c a)
fromPersist serv = readFromStable (view (config.storage) serv) >>= return . flip injectPersistentState serv