/
interceptor.go
1050 lines (872 loc) · 31.4 KB
/
interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package rpcperms
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/btcsuite/btclog"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/brolightningnetwork/broln/lnrpc"
"github.com/brolightningnetwork/broln/macaroons"
"github.com/brolightningnetwork/broln/monitoring"
"github.com/brolightningnetwork/broln/subscribe"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
// rpcState is an enum that we use to keep track of the current RPC service
// state. This will transition as we go from startup to unlocking the wallet,
// and finally fully active.
type rpcState uint8
const (
// waitingToStart indicates that we're at the beginning of the startup
// process. In a cluster environment this may mean that we're waiting to
// become the leader in which case RPC calls will be disabled until
// this instance has been elected as leader.
waitingToStart rpcState = iota
// walletNotCreated is the starting state if the RPC server is active,
// but the wallet is not yet created. In this state we'll only allow
// calls to the WalletUnlockerService.
walletNotCreated
// walletLocked indicates the RPC server is active, but the wallet is
// locked. In this state we'll only allow calls to the
// WalletUnlockerService.
walletLocked
// walletUnlocked means that the wallet has been unlocked, but the full
// RPC server is not yet ready.
walletUnlocked
// rpcActive means that the RPC server is ready to accept calls.
rpcActive
// serverActive means that the broln server is ready to accept calls.
serverActive
)
var (
// ErrWaitingToStart is returned if broln is still waiting to start,
// possibly blocked until elected as the leader.
ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
"available")
// ErrNoWallet is returned if the wallet does not exist.
ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
"full RPC access")
// ErrWalletLocked is returned if the wallet is locked and any service
// other than the WalletUnlocker is called.
ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
"full RPC access")
// ErrWalletUnlocked is returned if the WalletUnlocker service is
// called when the wallet already has been unlocked.
ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
"WalletUnlocker service is no longer available")
// ErrRPCStarting is returned if the wallet has been unlocked but the
// RPC server is not yet ready to accept calls.
ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
"starting up, but not yet ready to accept calls")
// macaroonWhitelist defines methods that we don't require macaroons to
// access. We also allow these methods to be called even if not all
// mandatory middlewares are registered yet. If the wallet is locked
// then a middleware cannot register itself, creating an impossible
// situation. Also, a middleware might want to check the state of broln
// by calling the State service before it registers itself. So we also
// need to exclude those calls from the mandatory middleware check.
macaroonWhitelist = map[string]struct{}{
// We allow all calls to the WalletUnlocker without macaroons.
"/lnrpc.WalletUnlocker/GenSeed": {},
"/lnrpc.WalletUnlocker/InitWallet": {},
"/lnrpc.WalletUnlocker/UnlockWallet": {},
"/lnrpc.WalletUnlocker/ChangePassword": {},
// The State service must be available at all times, even
// before we can check macaroons, so we whitelist it.
"/lnrpc.State/SubscribeState": {},
"/lnrpc.State/GetState": {},
}
)
// InterceptorChain is a struct that can be added to the running GRPC server,
// intercepting API calls. This is useful for logging, enforcing permissions,
// supporting middleware etc. The following diagram shows the order of each
// interceptor in the chain and when exactly requests/responses are intercepted
// and forwarded to external middleware for approval/modification. Middleware in
// general can only intercept gRPC requests/responses that are sent by the
// client with a macaroon that contains a custom caveat that is supported by one
// of the registered middlewares.
//
// |
// | gRPC request from client
// |
// +---v--------------------------------+
// | InterceptorChain |
// +-+----------------------------------+
// | Log Interceptor |
// +----------------------------------+
// | RPC State Interceptor |
// +----------------------------------+
// | Macaroon Interceptor |
// +----------------------------------+--------> +---------------------+
// | RPC Macaroon Middleware Handler |<-------- | External Middleware |
// +----------------------------------+ | - approve request |
// | Prometheus Interceptor | +---------------------+
// +-+--------------------------------+
// | validated gRPC request from client
// +---v--------------------------------+
// | main gRPC server |
// +---+--------------------------------+
// |
// | original gRPC request to client
// |
// +---v--------------------------------+--------> +---------------------+
// | RPC Macaroon Middleware Handler |<-------- | External Middleware |
// +---+--------------------------------+ | - modify response |
// | +---------------------+
// | edited gRPC request to client
// v
type InterceptorChain struct {
// lastRequestID is the ID of the last gRPC request or stream that was
// intercepted by the middleware interceptor.
//
// NOTE: Must be used atomically!
lastRequestID uint64
// Required by the grpc-gateway/v2 library for forward compatibility.
lnrpc.UnimplementedStateServer
started sync.Once
stopped sync.Once
// state is the current RPC state of our RPC server.
state rpcState
// ntfnServer is a subscription server we use to notify clients of the
// State service when the state changes.
ntfnServer *subscribe.Server
// noMacaroons should be set true if we don't want to check macaroons.
noMacaroons bool
// svc is the macaroon service used to enforce permissions in case
// macaroons are used.
svc *macaroons.Service
// permissionMap is the permissions to enforce if macaroons are used.
permissionMap map[string][]bakery.Op
// rpcsLog is the logger used to log calls to the RPCs intercepted.
rpcsLog btclog.Logger
// registeredMiddleware is a map of all macaroon permission based RPC
// middleware clients that are currently registered. The map is keyed
// by the middleware's name.
registeredMiddleware map[string]*MiddlewareHandler
// mandatoryMiddleware is a list of all middleware that is considered to
// be mandatory. If any of them is not registered then all RPC requests
// (except for the macaroon white listed methods and the middleware
// registration itself) are blocked. This is a security feature to make
// sure that requests can't just go through unobserved/unaudited if a
// middleware crashes.
mandatoryMiddleware []string
quit chan struct{}
sync.RWMutex
}
// A compile time check to ensure that InterceptorChain fully implements the
// StateServer gRPC service.
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
// NewInterceptorChain creates a new InterceptorChain.
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
mandatoryMiddleware []string) *InterceptorChain {
return &InterceptorChain{
state: waitingToStart,
ntfnServer: subscribe.NewServer(),
noMacaroons: noMacaroons,
permissionMap: make(map[string][]bakery.Op),
rpcsLog: log,
registeredMiddleware: make(map[string]*MiddlewareHandler),
mandatoryMiddleware: mandatoryMiddleware,
quit: make(chan struct{}),
}
}
// Start starts the InterceptorChain, which is needed to start the state
// subscription server it powers.
func (r *InterceptorChain) Start() error {
var err error
r.started.Do(func() {
err = r.ntfnServer.Start()
})
return err
}
// Stop stops the InterceptorChain and its internal state subscription server.
func (r *InterceptorChain) Stop() error {
var err error
r.stopped.Do(func() {
close(r.quit)
err = r.ntfnServer.Stop()
})
return err
}
// SetWalletNotCreated moves the RPC state from either waitingToStart to
// walletNotCreated.
func (r *InterceptorChain) SetWalletNotCreated() {
r.Lock()
defer r.Unlock()
r.state = walletNotCreated
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetWalletLocked moves the RPC state from either walletNotCreated to
// walletLocked.
func (r *InterceptorChain) SetWalletLocked() {
r.Lock()
defer r.Unlock()
r.state = walletLocked
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
// walletLocked to walletUnlocked.
func (r *InterceptorChain) SetWalletUnlocked() {
r.Lock()
defer r.Unlock()
r.state = walletUnlocked
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
func (r *InterceptorChain) SetRPCActive() {
r.Lock()
defer r.Unlock()
r.state = rpcActive
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetServerActive moves the RPC state from walletUnlocked to rpcActive.
func (r *InterceptorChain) SetServerActive() {
r.Lock()
defer r.Unlock()
r.state = serverActive
_ = r.ntfnServer.SendUpdate(r.state)
}
// rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
// WAITING_TO_START and an error on conversion error.
func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
const defaultState = lnrpc.WalletState_WAITING_TO_START
var walletState lnrpc.WalletState
switch state {
case waitingToStart:
walletState = lnrpc.WalletState_WAITING_TO_START
case walletNotCreated:
walletState = lnrpc.WalletState_NON_EXISTING
case walletLocked:
walletState = lnrpc.WalletState_LOCKED
case walletUnlocked:
walletState = lnrpc.WalletState_UNLOCKED
case rpcActive:
walletState = lnrpc.WalletState_RPC_ACTIVE
case serverActive:
walletState = lnrpc.WalletState_SERVER_ACTIVE
default:
return defaultState, fmt.Errorf("unknown wallet state %v", state)
}
return walletState, nil
}
// SubscribeState subscribes to the state of the wallet. The current wallet
// state will always be delivered immediately.
//
// NOTE: Part of the StateService interface.
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
stream lnrpc.State_SubscribeStateServer) error {
sendStateUpdate := func(state rpcState) error {
walletState, err := rpcStateToWalletState(state)
if err != nil {
return err
}
return stream.Send(&lnrpc.SubscribeStateResponse{
State: walletState,
})
}
// Subscribe to state updates.
client, err := r.ntfnServer.Subscribe()
if err != nil {
return err
}
defer client.Cancel()
// Always start by sending the current state.
r.RLock()
state := r.state
r.RUnlock()
if err := sendStateUpdate(state); err != nil {
return err
}
for {
select {
case e := <-client.Updates():
newState := e.(rpcState)
// Ignore already sent state.
if newState == state {
continue
}
state = newState
err := sendStateUpdate(state)
if err != nil {
return err
}
// The response stream's context for whatever reason has been
// closed. If context is closed by an exceeded deadline we will
// return an error.
case <-stream.Context().Done():
if errors.Is(stream.Context().Err(), context.Canceled) {
return nil
}
return stream.Context().Err()
case <-r.quit:
return fmt.Errorf("server exiting")
}
}
}
// GetState returns the current wallet state.
func (r *InterceptorChain) GetState(_ context.Context,
_ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
r.RLock()
state := r.state
r.RUnlock()
walletState, err := rpcStateToWalletState(state)
if err != nil {
return nil, err
}
return &lnrpc.GetStateResponse{
State: walletState,
}, nil
}
// AddMacaroonService adds a macaroon service to the interceptor. After this is
// done every RPC call made will have to pass a valid macaroon to be accepted.
func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
r.Lock()
defer r.Unlock()
r.svc = svc
}
// MacaroonService returns the currently registered macaroon service. This might
// be nil if none was registered (yet).
func (r *InterceptorChain) MacaroonService() *macaroons.Service {
r.RLock()
defer r.RUnlock()
return r.svc
}
// AddPermission adds a new macaroon rule for the given method.
func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
r.Lock()
defer r.Unlock()
if _, ok := r.permissionMap[method]; ok {
return fmt.Errorf("detected duplicate macaroon constraints "+
"for path: %v", method)
}
r.permissionMap[method] = ops
return nil
}
// Permissions returns the current set of macaroon permissions.
func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
r.RLock()
defer r.RUnlock()
// Make a copy under the read lock to avoid races.
c := make(map[string][]bakery.Op)
for k, v := range r.permissionMap {
s := make([]bakery.Op, len(v))
copy(s, v)
c[k] = s
}
return c
}
// RegisterMiddleware registers a new middleware that will handle request/
// response interception for all RPC messages that are initiated with a custom
// macaroon caveat. The name of the custom caveat a middleware is handling is
// also its unique identifier. Only one middleware can be registered for each
// custom caveat.
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
r.Lock()
defer r.Unlock()
// The name of the middleware is the unique identifier.
registered, ok := r.registeredMiddleware[mw.middlewareName]
if ok {
return fmt.Errorf("a middleware with the name '%s' is already "+
"registered", registered.middlewareName)
}
// For now, we only want one middleware per custom caveat name. If we
// allowed multiple middlewares handling the same caveat there would be
// a need for extra call chaining logic, and they could overwrite each
// other's responses.
for name, middleware := range r.registeredMiddleware {
if middleware.customCaveatName == mw.customCaveatName {
return fmt.Errorf("a middleware is already registered "+
"for the custom caveat name '%s': %v",
mw.customCaveatName, name)
}
}
r.registeredMiddleware[mw.middlewareName] = mw
return nil
}
// RemoveMiddleware removes the middleware that handles the given custom caveat
// name.
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
r.Lock()
defer r.Unlock()
log.Debugf("Removing middleware %s", middlewareName)
delete(r.registeredMiddleware, middlewareName)
}
// CustomCaveatSupported makes sure a middleware that handles the given custom
// caveat name is registered. If none is, an error is returned, signalling to
// the macaroon bakery and its validator to reject macaroons that have a custom
// caveat with that name.
//
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
r.RLock()
defer r.RUnlock()
// We only accept requests with a custom caveat if we also have a
// middleware registered that handles that custom caveat. That is
// crucial for security! Otherwise a request with an encumbered (=has
// restricted permissions based upon the custom caveat condition)
// macaroon would not be validated against the limitations that the
// custom caveat implicate. Since the map is keyed by the _name_ of the
// middleware, we need to loop through all of them to see if one has
// the given custom macaroon caveat name.
for _, middleware := range r.registeredMiddleware {
if middleware.customCaveatName == customCaveatName {
return nil
}
}
return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
"no middleware registered to handle it", customCaveatName)
}
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
// server in order to add this InterceptorChain.
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
var unaryInterceptors []grpc.UnaryServerInterceptor
var strmInterceptors []grpc.StreamServerInterceptor
// The first interceptors we'll add to the chain is our logging
// interceptors, so we can automatically log all errors that happen
// during RPC calls.
unaryInterceptors = append(
unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
)
strmInterceptors = append(
strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
)
// Next we'll add our RPC state check interceptors, that will check
// whether the attempted call is allowed in the current state.
unaryInterceptors = append(
unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
)
strmInterceptors = append(
strmInterceptors, r.rpcStateStreamServerInterceptor(),
)
// We'll add the macaroon interceptors. If macaroons aren't disabled,
// then these interceptors will enforce macaroon authentication.
unaryInterceptors = append(
unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
)
strmInterceptors = append(
strmInterceptors, r.MacaroonStreamServerInterceptor(),
)
// Next, we'll add the interceptors for our custom macaroon caveat based
// middleware.
unaryInterceptors = append(
unaryInterceptors, r.middlewareUnaryServerInterceptor(),
)
strmInterceptors = append(
strmInterceptors, r.middlewareStreamServerInterceptor(),
)
// Get interceptors for Prometheus to gather gRPC performance metrics.
// If monitoring is disabled, GetPromInterceptors() will return empty
// slices.
promUnaryInterceptors, promStrmInterceptors :=
monitoring.GetPromInterceptors()
// Concatenate the slices of unary and stream interceptors respectively.
unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
// Create server options from the interceptors we just set up.
chainedUnary := grpc_middleware.WithUnaryServerChain(
unaryInterceptors...,
)
chainedStream := grpc_middleware.WithStreamServerChain(
strmInterceptors...,
)
serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
return serverOpts
}
// errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
// automatically log any errors that occur when serving a client's unary
// request.
func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil {
// TODO(roasbeef): also log request details?
logger.Errorf("[%v]: %v", info.FullMethod, err)
}
return resp, err
}
}
// errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
// will log any errors that occur while processing a client or server streaming
// RPC.
func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
err := handler(srv, ss)
if err != nil {
logger.Errorf("[%v]: %v", info.FullMethod, err)
}
return err
}
}
// checkMacaroon validates that the context contains the macaroon needed to
// invoke the given RPC method.
func (r *InterceptorChain) checkMacaroon(ctx context.Context,
fullMethod string) error {
// If noMacaroons is set, we'll always allow the call.
if r.noMacaroons {
return nil
}
// Check whether the method is whitelisted, if so we'll allow it
// regardless of macaroons.
_, ok := macaroonWhitelist[fullMethod]
if ok {
return nil
}
r.RLock()
svc := r.svc
r.RUnlock()
// If the macaroon service is not yet active, we cannot allow
// the call.
if svc == nil {
return fmt.Errorf("unable to determine macaroon permissions")
}
r.RLock()
uriPermissions, ok := r.permissionMap[fullMethod]
r.RUnlock()
if !ok {
return fmt.Errorf("%s: unknown permissions required for method",
fullMethod)
}
// Find out if there is an external validator registered for
// this method. Fall back to the internal one if there isn't.
validator, ok := svc.ExternalValidators[fullMethod]
if !ok {
validator = svc
}
// Now that we know what validator to use, let it do its work.
return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
}
// MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
// request is authorized by the included macaroons.
func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
// Check macaroons.
if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
return nil, err
}
return handler(ctx, req)
}
}
// MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
// the request is authorized by the included macaroons.
func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Check macaroons.
err := r.checkMacaroon(ss.Context(), info.FullMethod)
if err != nil {
return err
}
return handler(srv, ss)
}
}
// checkRPCState checks whether a call to the given server is allowed in the
// current RPC state.
func (r *InterceptorChain) checkRPCState(srv interface{}) error {
// The StateService is being accessed, we allow the call regardless of
// the current state.
_, ok := srv.(lnrpc.StateServer)
if ok {
return nil
}
r.RLock()
state := r.state
r.RUnlock()
switch state {
// Do not accept any RPC calls (unless to the state service) until broln
// has not started.
case waitingToStart:
return ErrWaitingToStart
// If the wallet does not exists, only calls to the WalletUnlocker are
// accepted.
case walletNotCreated:
_, ok := srv.(lnrpc.WalletUnlockerServer)
if !ok {
return ErrNoWallet
}
// If the wallet is locked, only calls to the WalletUnlocker are
// accepted.
case walletLocked:
_, ok := srv.(lnrpc.WalletUnlockerServer)
if !ok {
return ErrWalletLocked
}
// If the wallet is unlocked, but the RPC not yet active, we reject.
case walletUnlocked:
_, ok := srv.(lnrpc.WalletUnlockerServer)
if ok {
return ErrWalletUnlocked
}
return ErrRPCStarting
// If the RPC server or broln server is active, we allow calls to any
// service except the WalletUnlocker.
case rpcActive, serverActive:
_, ok := srv.(lnrpc.WalletUnlockerServer)
if ok {
return ErrWalletUnlocked
}
default:
return fmt.Errorf("unknown RPC state: %v", state)
}
return nil
}
// rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
// calls to the given gGRPC server is allowed in the current rpc state.
func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
if err := r.checkRPCState(info.Server); err != nil {
return nil, err
}
return handler(ctx, req)
}
}
// rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
// calls to the given gGRPC server is allowed in the current rpc state.
func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := r.checkRPCState(srv); err != nil {
return err
}
return handler(srv, ss)
}
}
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
// all requests and responses that are sent with a macaroon containing a custom
// caveat condition that is handled by registered middleware.
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context,
req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
// Make sure we don't allow any requests through if one of the
// mandatory middlewares is missing.
fullMethod := info.FullMethod
if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
return nil, err
}
// If there is no middleware registered, we don't need to
// intercept anything.
if !r.middlewareRegistered() {
return handler(ctx, req)
}
msg, err := NewMessageInterceptionRequest(
ctx, TypeRequest, false, info.FullMethod, req,
)
if err != nil {
return nil, err
}
requestID := atomic.AddUint64(&r.lastRequestID, 1)
err = r.acceptRequest(requestID, msg)
if err != nil {
return nil, err
}
resp, respErr := handler(ctx, req)
if respErr != nil {
return resp, respErr
}
return r.interceptResponse(
ctx, requestID, false, info.FullMethod, resp,
)
}
}
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
// intercepts all requests and responses that are sent with a macaroon
// containing a custom caveat condition that is handled by registered
// middleware.
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{},
ss grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Don't intercept the interceptor itself which is a streaming
// RPC too!
fullMethod := info.FullMethod
if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
return handler(srv, ss)
}
// Make sure we don't allow any requests through if one of the
// mandatory middlewares is missing. We add this check here to
// make sure the middleware registration itself can still be
// called.
if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
return err
}
// If there is no middleware registered, we don't need to
// intercept anything.
if !r.middlewareRegistered() {
return handler(srv, ss)
}
// To give the middleware a chance to accept or reject the
// establishment of the stream itself (and not only when the
// first message is sent on the stream), we send an intercept
// request for the stream auth now:
msg, err := NewStreamAuthInterceptionRequest(
ss.Context(), info.FullMethod,
)
if err != nil {
return err
}
requestID := atomic.AddUint64(&r.lastRequestID, 1)
err = r.acceptRequest(requestID, msg)
if err != nil {
return err
}
wrappedSS := &serverStreamWrapper{
ServerStream: ss,
requestID: requestID,
fullMethod: info.FullMethod,
interceptor: r,
}
return handler(srv, wrappedSS)
}
}
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
// mandatory is currently registered.
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
r.RLock()
defer r.RUnlock()
// Allow calls that are whitelisted for macaroons as well, otherwise we
// get into impossible situations where the wallet is locked but the
// unlock call is denied because the middleware isn't registered. But
// the middleware cannot register itself because the wallet is locked.
if _, ok := macaroonWhitelist[fullMethod]; ok {
return nil
}
// Not a white listed call so make sure every mandatory middleware is
// currently connected to broln.
for _, name := range r.mandatoryMiddleware {
if _, ok := r.registeredMiddleware[name]; !ok {
return fmt.Errorf("mandatory middleware '%s' is "+
"currently not registered, not allowing any "+
"RPC calls", name)
}
}
return nil
}
// middlewareRegistered returns true if there is at least one middleware
// currently registered.
func (r *InterceptorChain) middlewareRegistered() bool {
r.RLock()
defer r.RUnlock()
return len(r.registeredMiddleware) > 0
}
// acceptRequest sends an intercept request to all middlewares that have
// registered for it. This means either a middleware has requested read-only
// access or the request actually has a macaroon with a caveat the middleware
// registered for.
func (r *InterceptorChain) acceptRequest(requestID uint64,
msg *InterceptionRequest) error {
r.RLock()
defer r.RUnlock()
for _, middleware := range r.registeredMiddleware {
// If there is a custom caveat in the macaroon, make sure the
// middleware registered for it. Or if a middleware registered
// for read-only mode, it also gets the request.
hasCustomCaveat := macaroons.HasCustomCaveat(
msg.Macaroon, middleware.customCaveatName,
)
if !hasCustomCaveat && !middleware.readOnly {
continue
}
msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
msg.Macaroon, middleware.customCaveatName,
)
resp, err := middleware.intercept(requestID, msg)
// Error during interception itself.
if err != nil {
return err
}
// Error returned from middleware client.
if resp.err != nil {
return resp.err
}
}
return nil
}
// interceptResponse sends out an intercept request for an RPC response. Since
// middleware that hasn't registered for the read-only mode has the option to
// overwrite/replace the response, this needs to be handled differently than the
// request/auth path above.
func (r *InterceptorChain) interceptResponse(ctx context.Context,
requestID uint64, isStream bool, fullMethod string,
m interface{}) (interface{}, error) {
r.RLock()
defer r.RUnlock()
currentMessage := m
for _, middleware := range r.registeredMiddleware {
msg, err := NewMessageInterceptionRequest(
ctx, TypeResponse, isStream, fullMethod, currentMessage,
)
if err != nil {
return nil, err
}
// If there is a custom caveat in the macaroon, make sure the
// middleware registered for it. Or if a middleware registered
// for read-only mode, it also gets the request.
hasCustomCaveat := macaroons.HasCustomCaveat(
msg.Macaroon, middleware.customCaveatName,
)
if !hasCustomCaveat && !middleware.readOnly {
continue
}
msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
msg.Macaroon, middleware.customCaveatName,
)
resp, err := middleware.intercept(requestID, msg)
// Error during interception itself.
if err != nil {
return nil, err
}
// Error returned from middleware client.
if resp.err != nil {
return nil, resp.err
}
// The message was replaced, make sure the next middleware in
// line receives the updated message.
if !middleware.readOnly && resp.replace {
currentMessage = resp.replacement