/
impl_follower.go
87 lines (74 loc) · 2.76 KB
/
impl_follower.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package curator
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/memory"
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
)
type curatorFollower struct {
etcd client.Namespaced
followerID string
node identity.Node
status *memory.Value[*electionStatus]
}
func (f *curatorFollower) GetCurrentLeader(_ *cpb.GetCurrentLeaderRequest, srv cpb.CuratorLocal_GetCurrentLeaderServer) error {
ctx := srv.Context()
w := f.status.Watch()
defer w.Close()
for {
st, err := w.Get(srv.Context())
if err != nil {
return err
}
if st.leader != nil {
return status.Errorf(codes.Unavailable, "node became leader, try again")
}
lock := st.follower.lock
// Manually load node status data from etcd, even though we are not a leader.
// This is fine, as if we ever end up serving stale data, the client will
// realize and call us again.
key, err := nodeEtcdPrefix.Key(lock.NodeId)
if err != nil {
rpc.Trace(ctx).Printf("invalid leader node id %q: %v", lock.NodeId, err)
return status.Errorf(codes.Internal, "current leader has invalid node id")
}
res, err := f.etcd.Get(ctx, key)
if err != nil {
rpc.Trace(ctx).Printf("could not get current leader's data: %v", err)
return status.Errorf(codes.Internal, "could not get current leader's data")
}
if len(res.Kvs) < 1 {
rpc.Trace(ctx).Printf("could not get current leader's data: 0 kvs")
return status.Errorf(codes.Internal, "could not get current leader's data")
}
node, err := nodeUnmarshal(res.Kvs[0].Value)
if err != nil {
rpc.Trace(ctx).Printf("could not unmarshal leader node %s: %v", lock.NodeId, err)
return status.Errorf(codes.Unavailable, "could not unmarshal leader node")
}
if node.status == nil || node.status.ExternalAddress == "" {
rpc.Trace(ctx).Printf("leader node %s has no address in status", lock.NodeId)
return status.Errorf(codes.Unavailable, "current leader has no reported address")
}
rpc.Trace(ctx).Printf("Sending leader: %s at %s", lock.NodeId, node.status.ExternalAddress)
err = srv.Send(&cpb.GetCurrentLeaderResponse{
LeaderNodeId: lock.NodeId,
LeaderHost: node.status.ExternalAddress,
LeaderPort: int32(common.CuratorServicePort),
ThisNodeId: f.followerID,
})
if err != nil {
return err
}
}
}
func (f *curatorFollower) GetCACertificate(ctx context.Context, _ *cpb.GetCACertificateRequest) (*cpb.GetCACertificateResponse, error) {
return &cpb.GetCACertificateResponse{
IdentityCaCertificate: f.node.ClusterCA().Raw,
}, nil
}