forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 1
/
client_rpc.go
299 lines (254 loc) · 8.11 KB
/
client_rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package nomad
import (
"errors"
"fmt"
"net"
"time"
multierror "github.com/hashicorp/go-multierror"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
"github.com/ugorji/go/codec"
)
// nodeConnState is used to track connection information about a Nomad Client.
type nodeConnState struct {
// Session holds the multiplexed yamux Session for dialing back.
Session *yamux.Session
// Established is when the connection was established.
Established time.Time
// Ctx is the full RPC context
Ctx *RPCContext
}
// getNodeConn returns the connection to the given node and whether it exists.
func (s *Server) getNodeConn(nodeID string) (*nodeConnState, bool) {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
conns, ok := s.nodeConns[nodeID]
if !ok {
return nil, false
}
// Return the latest conn
var state *nodeConnState
for _, conn := range conns {
if state == nil || state.Established.Before(conn.Established) {
state = conn
}
}
// Shouldn't happen but rather be safe
if state == nil {
s.logger.Named("client_rpc").Warn("node exists in node connection map without any connection", "node_id", nodeID)
return nil, false
}
return state, ok
}
// connectedNodes returns the set of nodes we have a connection with.
func (s *Server) connectedNodes() map[string]time.Time {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
nodes := make(map[string]time.Time, len(s.nodeConns))
for nodeID, conns := range s.nodeConns {
for _, conn := range conns {
if nodes[nodeID].Before(conn.Established) {
nodes[nodeID] = conn.Established
}
}
}
return nodes
}
// addNodeConn adds the mapping between a node and its session.
func (s *Server) addNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
// Capture the tracked connections so far
currentConns := s.nodeConns[ctx.NodeID]
// Check if we already have the connection. If we do, just update the
// establish time.
for _, c := range currentConns {
if c.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
c.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
c.Established = time.Now()
return
}
}
// Add the new conn
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID], &nodeConnState{
Session: ctx.Session,
Established: time.Now(),
Ctx: ctx,
})
}
// removeNodeConn removes the mapping between a node and its session.
func (s *Server) removeNodeConn(ctx *RPCContext) {
// Hotpath the no-op
if ctx == nil || ctx.NodeID == "" {
return
}
s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
conns, ok := s.nodeConns[ctx.NodeID]
if !ok {
return
}
// It is important that we check that the connection being removed is the
// actual stored connection for the client. It is possible for the client to
// dial various addresses that all route to the same server. The most common
// case for this is the original address the client uses to connect to the
// server differs from the advertised address sent by the heartbeat.
for i, conn := range conns {
if conn.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
conn.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
if len(conns) == 1 {
// We are deleting the last conn, remove it from the map
delete(s.nodeConns, ctx.NodeID)
} else {
// Slice out the connection we are deleting
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID][:i], s.nodeConns[ctx.NodeID][i+1:]...)
}
return
}
}
}
// serverWithNodeConn is used to determine which remote server has the most
// recent connection to the given node. The local server is not queried.
// ErrNoNodeConn is returned if all local peers could be queried but did not
// have a connection to the node. Otherwise if a connection could not be found
// and there were RPC errors, an error is returned.
func (s *Server) serverWithNodeConn(nodeID, region string) (*serverParts, error) {
// We skip ourselves.
selfAddr := s.LocalMember().Addr.String()
// Build the request
req := &structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: s.config.Region,
},
}
// Select the list of servers to check based on what region we are querying
s.peerLock.RLock()
var rawTargets []*serverParts
if region == s.Region() {
rawTargets = make([]*serverParts, 0, len(s.localPeers))
for _, srv := range s.localPeers {
rawTargets = append(rawTargets, srv)
}
} else {
peers, ok := s.peers[region]
if !ok {
s.peerLock.RUnlock()
return nil, structs.ErrNoRegionPath
}
rawTargets = peers
}
targets := make([]*serverParts, 0, len(rawTargets))
for _, target := range rawTargets {
targets = append(targets, target.Copy())
}
s.peerLock.RUnlock()
// connections is used to store the servers that have connections to the
// requested node.
var mostRecentServer *serverParts
var mostRecent time.Time
var rpcErr multierror.Error
for _, server := range targets {
if server.Addr.String() == selfAddr {
continue
}
// Make the RPC
var resp structs.NodeConnQueryResponse
err := s.connPool.RPC(s.config.Region, server.Addr, server.MajorVersion,
"Status.HasNodeConn", &req, &resp)
if err != nil {
multierror.Append(&rpcErr, fmt.Errorf("failed querying server %q: %v", server.Addr.String(), err))
continue
}
if resp.Connected && resp.Established.After(mostRecent) {
mostRecentServer = server
mostRecent = resp.Established
}
}
// Return an error if there is no route to the node.
if mostRecentServer == nil {
if err := rpcErr.ErrorOrNil(); err != nil {
return nil, err
}
return nil, structs.ErrNoNodeConn
}
return mostRecentServer, nil
}
// NodeRpc is used to make an RPC call to a node. The method takes the
// Yamux session for the node and the method to be called.
func NodeRpc(session *yamux.Session, method string, args, reply interface{}) error {
// Open a new session
stream, err := session.Open()
if err != nil {
return err
}
defer stream.Close()
// Write the RpcNomad byte to set the mode
if _, err := stream.Write([]byte{byte(pool.RpcNomad)}); err != nil {
stream.Close()
return err
}
// Make the RPC
err = msgpackrpc.CallWithCodec(pool.NewClientCodec(stream), method, args, reply)
if err != nil {
return err
}
return nil
}
// NodeStreamingRpc is used to make a streaming RPC call to a node. The method
// takes the Yamux session for the node and the method to be called. It conducts
// the initial handshake and returns a connection to be used or an error. It is
// the callers responsibility to close the connection if there is no error.
func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) {
// Open a new session
stream, err := session.Open()
if err != nil {
return nil, err
}
// Write the RpcNomad byte to set the mode
if _, err := stream.Write([]byte{byte(pool.RpcStreaming)}); err != nil {
stream.Close()
return nil, err
}
// Send the header
encoder := codec.NewEncoder(stream, structs.MsgpackHandle)
decoder := codec.NewDecoder(stream, structs.MsgpackHandle)
header := structs.StreamingRpcHeader{
Method: method,
}
if err := encoder.Encode(header); err != nil {
stream.Close()
return nil, err
}
// Wait for the acknowledgement
var ack structs.StreamingRpcAck
if err := decoder.Decode(&ack); err != nil {
stream.Close()
return nil, err
}
if ack.Error != "" {
stream.Close()
return nil, errors.New(ack.Error)
}
return stream, nil
}
// findNodeConnAndForward is a helper for finding the server with a connection
// to the given node and forwarding the RPC to the correct server. This does not
// work for streaming RPCs.
func findNodeConnAndForward(srv *Server, nodeID, method string, args, reply interface{}) error {
// Determine the Server that has a connection to the node.
srvWithConn, err := srv.serverWithNodeConn(nodeID, srv.Region())
if err != nil {
return err
}
if srvWithConn == nil {
return structs.ErrNoNodeConn
}
return srv.forwardServer(srvWithConn, method, args, reply)
}