-
Notifications
You must be signed in to change notification settings - Fork 110
/
fake_cloud.go
206 lines (162 loc) · 5.07 KB
/
fake_cloud.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Package testutils helpers for testing the config retrievial.
package testutils
import (
"context"
"errors"
"net"
"net/http"
"sync"
"github.com/edaniels/golog"
pb "go.viam.com/api/app/v1"
"go.viam.com/utils"
"go.viam.com/utils/rpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
rutils "go.viam.com/rdk/utils"
)
// FakeCredentialPayLoad the hardcoded payload for all devices.
const FakeCredentialPayLoad = "some-secret"
// FakeCloudServer fake implementation of the Viam Cloud RobotService.
type FakeCloudServer struct {
pb.UnimplementedRobotServiceServer
rpcServer rpc.Server
listener net.Listener
exitWg sync.WaitGroup
deviceConfigs map[string]*configAndCerts
failOnConfigAndCerts bool
mu sync.Mutex
}
type configAndCerts struct {
cfg *pb.RobotConfig
certs *pb.CertificateResponse
}
// NewFakeCloudServer creates and starts a new grpc server for the Viam Cloud.
func NewFakeCloudServer(ctx context.Context, logger golog.Logger) (*FakeCloudServer, error) {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
if err != nil {
return nil, err
}
server := &FakeCloudServer{
listener: listener,
deviceConfigs: map[string]*configAndCerts{},
}
server.rpcServer, err = rpc.NewServer(logger,
rpc.WithDisableMulticastDNS(),
rpc.WithAuthHandler(rutils.CredentialsTypeRobotSecret, rpc.AuthHandlerFunc(
server.robotSecretAuthenticate,
)),
rpc.WithEntityDataLoader(rutils.CredentialsTypeRobotSecret, rpc.EntityDataLoaderFunc(
server.robotSecretEntityDataLoad,
)),
rpc.WithWebRTCServerOptions(rpc.WebRTCServerOptions{Enable: false}))
if err != nil {
return nil, err
}
err = server.rpcServer.RegisterServiceServer(
ctx,
&pb.RobotService_ServiceDesc,
server,
pb.RegisterRobotServiceHandlerFromEndpoint,
)
if err != nil {
return nil, err
}
server.exitWg.Add(1)
utils.PanicCapturingGo(func() {
defer server.exitWg.Done()
err := server.rpcServer.Serve(server.listener)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Warnf("Error shutting down grpc server", "error", err)
}
})
return server, nil
}
// FailOnConfigAndCerts if `failure` is true the server will return an Internal error on
// all certficate and config requests.
func (s *FakeCloudServer) FailOnConfigAndCerts(failure bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.failOnConfigAndCerts = failure
}
// Addr returns the listeners address.
func (s *FakeCloudServer) Addr() net.Addr {
return s.listener.Addr()
}
// Shutdown will stop the server.
func (s *FakeCloudServer) Shutdown() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.rpcServer.Stop()
if err != nil {
return err
}
s.exitWg.Wait()
return nil
}
// Clear resets the fake servers state, does not restart the server.
func (s *FakeCloudServer) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.deviceConfigs = map[string]*configAndCerts{}
}
// StoreDeviceConfig store config and cert data for the device id.
func (s *FakeCloudServer) StoreDeviceConfig(id string, cfg *pb.RobotConfig, cert *pb.CertificateResponse) {
s.mu.Lock()
defer s.mu.Unlock()
s.deviceConfigs[id] = &configAndCerts{cfg: cfg, certs: cert}
}
// Config impl.
func (s *FakeCloudServer) Config(ctx context.Context, req *pb.ConfigRequest) (*pb.ConfigResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.failOnConfigAndCerts {
return nil, status.Error(codes.Internal, "oops failure")
}
d, ok := s.deviceConfigs[req.Id]
if !ok {
return nil, status.Error(codes.NotFound, "config for device not found")
}
return &pb.ConfigResponse{Config: d.cfg}, nil
}
// Certificate impl.
func (s *FakeCloudServer) Certificate(ctx context.Context, req *pb.CertificateRequest) (*pb.CertificateResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.failOnConfigAndCerts {
return nil, status.Error(codes.Internal, "oops failure")
}
d, ok := s.deviceConfigs[req.Id]
if !ok {
return nil, status.Error(codes.NotFound, "cert for device not found")
}
return d.certs, nil
}
// Log impl.
func (s *FakeCloudServer) Log(ctx context.Context, req *pb.LogRequest) (*pb.LogResponse, error) {
return nil, status.Error(codes.Unimplemented, "method Log not implemented")
}
// NeedsRestart impl.
func (s *FakeCloudServer) NeedsRestart(ctx context.Context, req *pb.NeedsRestartRequest) (*pb.NeedsRestartResponse, error) {
return nil, status.Error(codes.Unimplemented, "method NeedsRestart not implemented")
}
func (s *FakeCloudServer) robotSecretAuthenticate(ctx context.Context, entity, payload string) (map[string]string, error) {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.deviceConfigs[entity]
if !ok {
return nil, errors.New("failed to auth device not found in fake server")
}
if payload != FakeCredentialPayLoad {
return nil, errors.New("failed to auth device payload does not match")
}
return map[string]string{}, nil
}
func (s *FakeCloudServer) robotSecretEntityDataLoad(ctx context.Context, claims rpc.Claims) (interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.deviceConfigs[claims.Entity()]
if !ok {
return nil, errors.New("failed to verify entity in fake server")
}
return map[string]string{}, nil
}