/
ServerNodeRPC.go
169 lines (150 loc) · 4.72 KB
/
ServerNodeRPC.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package server
import (
"github.com/owlfish/forestbus-server/model"
"github.com/owlfish/forestbus/rapi"
)
// RPCHandler holds all exposed RPC methods.
type RPCHandler struct {
server *ServerNode
identified bool
peerName string
}
func NewRPCHandler(srv *ServerNode) *RPCHandler {
return &RPCHandler{server: srv}
}
// IdentifyNodeArgs contains the arguments for a Node calling identify
type IdentifyNodeArgs struct {
// Name is the name of the peer
Name string
// ClusterID is the ID the peer has been configured with.
ClusterID string
}
type IdentifyNodeResults struct {
Result rapi.ResultInfo
}
/*
IdentifyNode is used by other peers in the cluster to identify themselves to this node.
*/
func (handler *RPCHandler) IdentifyNode(args *IdentifyNodeArgs, results *IdentifyNodeResults) error {
handler.peerName = args.Name
handler.identified = true
if args.ClusterID == handler.server.GetClusterID() {
results.Result = rapi.Get_RI_SUCCESS()
} else {
results.Result = rapi.Get_RI_MISMATCHED_CLUSTER_ID(handler.server.address, args.ClusterID, handler.server.GetClusterID())
}
return nil
}
type RequestVoteArgs struct {
Topic string
Term int64
CandidateID string
LastLogIndex int64
LastLogTerm int64
}
type RequestVoteResults struct {
Term int64
VoteGranted bool
}
/*
RequestVote is used by peers to request a vote during an election.
*/
func (handler *RPCHandler) RequestVote(args *RequestVoteArgs, results *RequestVoteResults) error {
if handler.identified {
node := handler.server.GetNode(args.Topic)
if node != nil {
node.RequestVote(args, results)
} else {
return rapi.ERR_TOPIC_NOT_FOUND
}
}
return nil
}
type AppendEntriesArgs struct {
Topic string
Term int64
LeaderID string
PrevLogIndex int64
PrevLogTerm int64
Entries model.Messages
// LeaderFirstIndex is set to the first index that the leader knows abuot. Used when PreLogIndex is set to 0
LeaderFirstIndex int64
LeaderCommitIndex int64
}
// AppendEntriesResults contains the returns data from a call by a leader to a follower to append messages
type AppendEntriesResults struct {
// Term is the term of the follower node
Term int64
// Success is true if the append worked
Success bool
// NextIndex contains the next index that the follower is expecting.
// If Success if false this can be used by the leader to determine whether to skip back in time
// to allow faster catchup of new / recovering followers.
NextIndex int64
}
/*
AppendEntries is used by leaders to inform followers of new message entries.
*/
func (handler *RPCHandler) AppendEntries(args *AppendEntriesArgs, results *AppendEntriesResults) error {
if handler.identified {
node := handler.server.GetNode(args.Topic)
if node != nil {
node.AppendEntries(args, results)
} else {
return rapi.ERR_TOPIC_NOT_FOUND
}
}
return nil
}
type ConfigChangeResults struct {
Result rapi.ResultInfo
}
/*
ConfigurationChange is used by the admin tool to send new configuration details to the nodes.
*/
func (handler *RPCHandler) ConfigurationChange(args *ServerNodeConfiguration, results *ConfigChangeResults) error {
handler.server.ChangeConfiguration(args, results)
return nil
}
// Client RPC support
/*
SendMessages is a Client RPC method allowing clients to send new messages to a topic on the leader node.
*/
func (handler *RPCHandler) SendMessages(args *rapi.SendMessagesArgs, results *rapi.SendMessagesResults) error {
node := handler.server.GetNode(args.Topic)
if node != nil {
return node.ClientRequestSendMessages(args, results)
} else {
results.Result = rapi.Get_RI_TOPIC_NOT_FOUND(handler.server.address, args.Topic)
return nil
}
}
/*
ReceiveMessages is a Client RPC method allowing clients to get messages from this node.
*/
func (handler *RPCHandler) ReceiveMessages(args *rapi.ReceiveMessagesArgs, results *rapi.ReceiveMessagesResults) error {
node := handler.server.GetNode(args.Topic)
if node != nil {
return node.ClientReceiveMessages(args, results)
} else {
return rapi.ERR_TOPIC_NOT_FOUND
}
}
/*
GetClusterDetails is a Client RPC method allowing clients to discover details about the cluster configuration.
*/
func (handler *RPCHandler) GetClusterDetails(args *rapi.GetClusterDetailsArgs, results *rapi.GetClusterDetailsResults) error {
return handler.server.GetClusterDetails(args, results)
}
/*
GetTopicDetails is a Client RPC method allowing clients to discover details about a topic.
*/
func (handler *RPCHandler) GetTopicDetails(args *rapi.GetTopicDetailsArgs, results *rapi.GetTopicDetailsResults) error {
node := handler.server.GetNode(args.Topic)
if node != nil {
return node.GetTopicDetails(args, results)
} else {
results.Result = rapi.Get_RI_TOPIC_NOT_FOUND(handler.server.address, args.Topic)
return nil
}
}