/
curator.go
110 lines (97 loc) · 2.88 KB
/
curator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package util
import (
"context"
"net"
"testing"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/metropolis/pkg/event/memory"
)
// TestCurator is a shim Curator implementation that serves pending Watch
// requests based on data submitted to a channel.
type TestCurator struct {
apb.UnimplementedCuratorServer
watchC chan *apb.WatchEvent
updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest]
}
// Watch implements a minimum Watch which just returns all nodes at once.
func (t *TestCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error {
ctx := srv.Context()
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-t.watchC:
if err := srv.Send(ev); err != nil {
return err
}
}
}
}
func (t *TestCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) {
t.updateReq.Set(req)
return &apb.UpdateNodeClusterNetworkingResponse{}, nil
}
// NodeWithPrefixes submits a given node/key/address with prefixes to the Watch
// event channel.
func (t *TestCurator) NodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) {
var p []*cpb.NodeClusterNetworking_Prefix
for _, prefix := range prefixes {
p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix})
}
n := &apb.Node{
Id: id,
Status: &cpb.NodeStatus{
ExternalAddress: address,
},
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: key.PublicKey().String(),
Prefixes: p,
},
Roles: &cpb.NodeRoles{
ConsensusMember: &cpb.NodeRoles_ConsensusMember{},
},
}
t.watchC <- &apb.WatchEvent{
Nodes: []*apb.Node{
n,
},
}
}
// DeleteNode submits a given node for deletion to the Watch event channel.
func (t *TestCurator) DeleteNode(id string) {
t.watchC <- &apb.WatchEvent{
NodeTombstones: []*apb.WatchEvent_NodeTombstone{
{
NodeId: id,
},
},
}
}
// MakeTestCurator returns a working TestCurator alongside a grpc connection to
// it.
func MakeTestCurator(t *testing.T) (*TestCurator, *grpc.ClientConn) {
cur := &TestCurator{
watchC: make(chan *apb.WatchEvent),
}
srv := grpc.NewServer()
apb.RegisterCuratorServer(srv, cur)
externalLis := bufconn.Listen(1024 * 1024)
go func() {
if err := srv.Serve(externalLis); err != nil {
t.Fatalf("GRPC serve failed: %v", err)
}
}()
withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return externalLis.Dial()
})
cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Dialing GRPC failed: %v", err)
}
return cur, cl
}