/
impl_leader_management.go
461 lines (406 loc) · 15.1 KB
/
impl_leader_management.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
package curator
import (
"bytes"
"context"
"crypto/ed25519"
"sort"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
dpb "google.golang.org/protobuf/types/known/durationpb"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
type leaderManagement struct {
*leadership
// node certificate on which leaderManagement runs. It's used by
// GetClusterInformation which needs access to the CA pubkey.
// Alternatively this could be stored in etcd, instead of being dependency
// injected here.
node *identity.Node
}
const (
// registerTicketSize is the size, in bytes, of the RegisterTicket used to
// perform early perimeter checks for nodes which wish to register into the
// cluster.
//
// The size was picked to offer resistance against on-line bruteforcing attacks
// in even the worst case scenario (no ratelimiting, no monitoring, zero latency
// between attacker and cluster). 256 bits of entropy require 3.6e68 requests
// per second to bruteforce the ticket within 10 years. The ticket doesn't need
// to be manually copied by humans, so the relatively overkill size also doesn't
// impact usability.
registerTicketSize = 32
// HeartbeatPeriod is the duration between consecutive heartbeat update
// messages sent by the node.
HeartbeatInterval = time.Second * 5
// HeartbeatTimeout is the duration after which a node is considered to be
// timing out, given no recent heartbeat updates were received by the leader.
HeartbeatTimeout = HeartbeatInterval * 2
)
const (
// registerTicketEtcdPath is the etcd key under which private.RegisterTicket is
// stored.
registerTicketEtcdPath = "/global/register_ticket"
)
func (l *leaderManagement) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
ticket, err := l.ensureRegisterTicket(ctx)
if err != nil {
return nil, err
}
return &apb.GetRegisterTicketResponse{
Ticket: ticket,
}, nil
}
// GetClusterInfo implements Management.GetClusterInfo, which returns summary
// information about the Metropolis cluster.
func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
// Sort nodes by public key, filter out Up, use top 15 in cluster directory
// (limited to an arbitrary amount that doesn't overload callers with
// unnecesssary information).
//
// MVP: this should be formalized and possibly re-designed/engineered.
kvs := res.Responses[0].GetResponseRange().Kvs
var nodes []*Node
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
continue
}
if node.state != cpb.NodeState_NODE_STATE_UP {
continue
}
nodes = append(nodes, node)
}
sort.Slice(nodes, func(i, j int) bool {
return bytes.Compare(nodes[i].pubkey, nodes[j].pubkey) < 0
})
if len(nodes) > 15 {
nodes = nodes[:15]
}
// Build cluster directory.
directory := &cpb.ClusterDirectory{
Nodes: make([]*cpb.ClusterDirectory_Node, len(nodes)),
}
for i, node := range nodes {
var addresses []*cpb.ClusterDirectory_Node_Address
if node.status != nil && node.status.ExternalAddress != "" {
addresses = append(addresses, &cpb.ClusterDirectory_Node_Address{
Host: node.status.ExternalAddress,
})
}
directory.Nodes[i] = &cpb.ClusterDirectory_Node{
PublicKey: node.pubkey,
Addresses: addresses,
}
}
resp := &apb.GetClusterInfoResponse{
ClusterDirectory: directory,
CaCertificate: l.node.ClusterCA().Raw,
}
cl, err := clusterLoad(ctx, l.leadership)
if err == nil {
resp.ClusterConfiguration, _ = cl.proto()
}
return resp, nil
}
// nodeHeartbeatTimestamp returns the node nid's last heartbeat timestamp, as
// seen from the Curator leader's perspective. If no heartbeats were received
// from the node, a zero time.Time value is returned.
func (l *leaderManagement) nodeHeartbeatTimestamp(nid string) time.Time {
smv, ok := l.ls.heartbeatTimestamps.Load(nid)
if ok {
return smv.(time.Time)
}
return time.Time{}
}
// nodeHealth returns the node's health, along with the duration since last
// heartbeat was received, given a current timestamp.
func (l *leaderManagement) nodeHealth(node *Node, now time.Time) (apb.Node_Health, time.Duration) {
// Get the last received node heartbeat's timestamp.
nid := identity.NodeID(node.pubkey)
nts := l.nodeHeartbeatTimestamp(nid)
// lhb is the duration since the last heartbeat was received.
lhb := now.Sub(nts)
// Determine the node's health based on the heartbeat timestamp.
var nh apb.Node_Health
if node.state == cpb.NodeState_NODE_STATE_UP {
// Only UP nodes can send heartbeats.
switch {
// If no heartbeats were received, but the leadership has only just
// started, the node's health is unknown.
case nts.IsZero() && (now.Sub(l.ls.startTs) < HeartbeatTimeout):
nh = apb.Node_UNKNOWN
// If the leader had received heartbeats from the node, but the last
// heartbeat is stale, the node is timing out.
case lhb > HeartbeatTimeout:
nh = apb.Node_HEARTBEAT_TIMEOUT
// Otherwise, the node can be declared healthy.
default:
nh = apb.Node_HEALTHY
}
} else {
// Since node isn't UP, its health is unknown. Non-UP nodes can't access
// the heartbeat RPC.
nh = apb.Node_UNKNOWN
}
return nh, lhb
}
// GetNodes implements Management.GetNodes, which returns a list of nodes from
// the point of view of the cluster.
func (l *leaderManagement) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
ctx := srv.Context()
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Retrieve all nodes from etcd in a single Get call.
res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
if err != nil {
return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
// Create a CEL filter program, to be used in the reply loop below.
filter, err := buildNodeFilter(ctx, req.Filter)
if err != nil {
return err
}
// Get a singular monotonic timestamp to reference node heartbeat timestamps
// against.
now := time.Now()
// Convert etcd data into proto nodes, send one streaming response for each
// node.
kvs := res.Responses[0].GetResponseRange().Kvs
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
continue
}
// Convert node roles.
roles := &cpb.NodeRoles{}
if node.kubernetesController != nil {
roles.KubernetesController = &cpb.NodeRoles_KubernetesController{}
}
if node.kubernetesWorker != nil {
roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
}
if node.consensusMember != nil {
roles.ConsensusMember = &cpb.NodeRoles_ConsensusMember{}
}
// Assess the node's health.
health, lhb := l.nodeHealth(node, now)
entry := apb.Node{
Pubkey: node.pubkey,
Id: identity.NodeID(node.pubkey),
State: node.state,
Status: node.status,
Roles: roles,
TimeSinceHeartbeat: dpb.New(lhb),
Health: health,
TpmUsage: node.tpmUsage,
}
// Evaluate the filter expression for this node. Send the node, if it's
// kept by the filter.
keep, err := filter(ctx, &entry)
if err != nil {
return err
}
if !keep {
continue
}
if err := srv.Send(&entry); err != nil {
return err
}
}
return nil
}
func (l *leaderManagement) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (*apb.ApproveNodeResponse, error) {
// MVP: check if policy allows for this node to be approved for this cluster.
// This should happen automatically, if possible, via hardware attestation
// against policy, not manually.
// Ensure the given key resembles a public key before using it to generate
// a node iD. This key is then used to craft an arbitrary etcd path, so
// let's do an early check in case the user set something that's obviously
// not a public key.
if len(req.Pubkey) != ed25519.PublicKeySize {
return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
}
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Find node for this pubkey.
id := identity.NodeID(req.Pubkey)
node, err := nodeLoad(ctx, l.leadership, id)
if err != nil {
return nil, err
}
// Ensure node is either UP/STANDBY (no-op) or NEW (set to STANDBY).
switch node.state {
case cpb.NodeState_NODE_STATE_UP, cpb.NodeState_NODE_STATE_STANDBY:
// No-op for idempotency.
return &apb.ApproveNodeResponse{}, nil
case cpb.NodeState_NODE_STATE_NEW:
// What we can act on.
default:
return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state)
}
// Set node to be STANDBY.
node.state = cpb.NodeState_NODE_STATE_STANDBY
if err := nodeSave(ctx, l.leadership, node); err != nil {
return nil, err
}
return &apb.ApproveNodeResponse{}, nil
}
// UpdateNodeRoles implements Management.UpdateNodeRoles, which in addition to
// adjusting the affected node's representation within the cluster, can also
// trigger the addition of a new etcd learner node.
func (l *leaderManagement) UpdateNodeRoles(ctx context.Context, req *apb.UpdateNodeRolesRequest) (*apb.UpdateNodeRolesResponse, error) {
// Nodes are identifiable by either of their public keys or (string) node IDs.
// In case a public key was provided, convert it to a corresponding node ID
// here.
var id string
switch rid := req.Node.(type) {
case *apb.UpdateNodeRolesRequest_Pubkey:
if len(rid.Pubkey) != ed25519.PublicKeySize {
return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
}
// Convert the pubkey into node ID.
id = identity.NodeID(rid.Pubkey)
case *apb.UpdateNodeRolesRequest_Id:
id = rid.Id
default:
return nil, status.Errorf(codes.InvalidArgument, "exactly one of pubkey or id must be set")
}
// Take l.muNodes before modifying the node.
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Find the node matching the requested public key.
node, err := nodeLoad(ctx, l.leadership, id)
if err == errNodeNotFound {
return nil, status.Errorf(codes.NotFound, "node %s not found", id)
}
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "while loading node %s: %v", id, err)
}
// Adjust each role, if a corresponding value is set within the request. Do
// nothing, if the role is already matches the requested value.
if req.ConsensusMember != nil {
if *req.ConsensusMember {
// Add a new etcd learner node.
w := l.consensus.Watch()
defer w.Close()
st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
}
join, err := st.AddNode(ctx, node.pubkey)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not add node: %v", err)
}
// Modify the node's state to reflect the change.
node.EnableConsensusMember(join)
} else {
if node.kubernetesController != nil {
return nil, status.Errorf(codes.FailedPrecondition, "could not remove consensus member role while node is a kubernetes controller")
}
node.DisableConsensusMember()
}
}
if req.KubernetesController != nil {
if *req.KubernetesController {
if node.consensusMember == nil {
return nil, status.Errorf(codes.FailedPrecondition, "could not set role: Kubernetes controller nodes must also be consensus members")
}
node.EnableKubernetesController()
} else {
node.DisableKubernetesController()
}
}
if req.KubernetesWorker != nil {
if *req.KubernetesWorker {
node.EnableKubernetesWorker()
} else {
node.DisableKubernetesWorker()
}
}
if err := nodeSave(ctx, l.leadership, node); err != nil {
return nil, err
}
return &apb.UpdateNodeRolesResponse{}, nil
}
func (l *leaderManagement) DecommissionNode(ctx context.Context, req *apb.DecommissionNodeRequest) (*apb.DecommissionNodeResponse, error) {
// Decommissioning is currently unimplemented. We'll get to that soon. For now,
// use SafetyBypassNotDecommissioned.
return nil, status.Error(codes.Unimplemented, "unimplemented")
}
func (l *leaderManagement) DeleteNode(ctx context.Context, req *apb.DeleteNodeRequest) (*apb.DeleteNodeResponse, error) {
bypassRoles := req.SafetyBypassHasRoles != nil
bypassDecommissioned := req.SafetyBypassNotDecommissioned != nil
// Nodes are identifiable by either of their public keys or (string) node IDs.
// In case a public key was provided, convert it to a corresponding node ID
// here.
var id string
switch rid := req.Node.(type) {
case *apb.DeleteNodeRequest_Pubkey:
if len(rid.Pubkey) != ed25519.PublicKeySize {
return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
}
// Convert the pubkey into node ID.
id = identity.NodeID(rid.Pubkey)
case *apb.DeleteNodeRequest_Id:
id = rid.Id
default:
return nil, status.Errorf(codes.InvalidArgument, "exactly one of pubkey or id must be set")
}
// Take l.muNodes before modifying the node.
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Find the node matching the requested public key.
node, err := nodeLoad(ctx, l.leadership, id)
if err == errNodeNotFound {
return nil, status.Errorf(codes.NotFound, "node %s not found", id)
}
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "while loading node %s: %v", id, err)
}
// Check safety assertions.
if !bypassRoles {
if node.consensusMember != nil {
return nil, status.Error(codes.FailedPrecondition, "node still has ConsensusMember role")
}
if node.kubernetesController != nil {
return nil, status.Error(codes.FailedPrecondition, "node still has KubernetesController role")
}
if node.kubernetesWorker != nil {
return nil, status.Error(codes.FailedPrecondition, "node still has KubernetesWorker role")
}
}
switch node.state {
case cpb.NodeState_NODE_STATE_NEW:
// Okay to remove, NEW node didn't yet receive any data.
case cpb.NodeState_NODE_STATE_STANDBY:
// Okay to remove, STANDBY node didn't yet receive any data.
case cpb.NodeState_NODE_STATE_UP:
if !bypassDecommissioned {
return nil, status.Error(codes.FailedPrecondition, "node must be decommissioned first")
}
case cpb.NodeState_NODE_STATE_DECOMMISSIONED:
// Always okay to remove a decommissioned node.
default:
return nil, status.Error(codes.Internal, "node has an invalid internal state")
}
// TODO(q3k): ensure deleted nodes are rejected by the leader. Currently the
// server-side authentication middleware is completely offline. We should either:
//
// 1. emit a revocation and distribute it to all nodes
// 2. give some additional middleware to the leader that performs online
// verification (which is okay to do on the leader, as the leader always has
// access to cluster data).
err = nodeDestroy(ctx, l.leadership, node)
return &apb.DeleteNodeResponse{}, err
}