-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
brontide.go
3092 lines (2620 loc) · 97.5 KB
/
brontide.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package peer
import (
"bytes"
"container/list"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/ticker"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
)
const (
// pingInterval is the interval at which ping messages are sent.
pingInterval = 1 * time.Minute
// idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute
// writeMessageTimeout is the timeout used when writing a message to the
// peer.
writeMessageTimeout = 5 * time.Second
// readMessageTimeout is the timeout used when reading a message from a
// peer.
readMessageTimeout = 5 * time.Second
// handshakeTimeout is the timeout used when waiting for the peer's init
// message.
handshakeTimeout = 15 * time.Second
// outgoingQueueLen is the buffer size of the channel which houses
// messages to be sent across the wire, requested by objects outside
// this struct.
outgoingQueueLen = 50
// ErrorBufferSize is the number of historic peer errors that we store.
ErrorBufferSize = 10
)
var (
// ErrChannelNotFound is an error returned when a channel is queried and
// either the Brontide doesn't know of it, or the channel in question
// is pending.
ErrChannelNotFound = fmt.Errorf("channel not found")
)
// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
// a buffered channel which will be sent upon once the write is complete. This
// buffered channel acts as a semaphore to be used for synchronization purposes.
type outgoingMsg struct {
priority bool
msg lnwire.Message
errChan chan error // MUST be buffered.
}
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows
// the receiver of the request to report when the channel creation process has
// completed.
type newChannelMsg struct {
channel *channeldb.OpenChannel
err chan error
}
// closeMsg is a wrapper struct around any wire messages that deal with the
// cooperative channel closure negotiation process. This struct includes the
// raw channel ID targeted along with the original message.
type closeMsg struct {
cid lnwire.ChannelID
msg lnwire.Message
}
// PendingUpdate describes the pending state of a closing channel.
type PendingUpdate struct {
Txid []byte
OutputIndex uint32
}
// ChannelCloseUpdate contains the outcome of the close channel operation.
type ChannelCloseUpdate struct {
ClosingTxid []byte
Success bool
}
// TimestampedError is a timestamped error that is used to store the most recent
// errors we have experienced with our peers.
type TimestampedError struct {
Error error
Timestamp time.Time
}
// Config defines configuration fields that are necessary for a peer object
// to function.
type Config struct {
// Conn is the underlying network connection for this peer.
Conn MessageConn
// ConnReq stores information related to the persistent connection request
// for this peer.
ConnReq *connmgr.ConnReq
// PubKeyBytes is the serialized, compressed public key of this peer.
PubKeyBytes [33]byte
// Addr is the network address of the peer.
Addr *lnwire.NetAddress
// Inbound indicates whether or not the peer is an inbound peer.
Inbound bool
// Features is the set of features that we advertise to the remote party.
Features *lnwire.FeatureVector
// LegacyFeatures is the set of features that we advertise to the remote
// peer for backwards compatibility. Nodes that have not implemented
// flat features will still be able to read our feature bits from the
// legacy global field, but we will also advertise everything in the
// default features field.
LegacyFeatures *lnwire.FeatureVector
// OutgoingCltvRejectDelta defines the number of blocks before expiry of
// an htlc where we don't offer it anymore.
OutgoingCltvRejectDelta uint32
// ChanActiveTimeout specifies the duration the peer will wait to request
// a channel reenable, beginning from the time the peer was started.
ChanActiveTimeout time.Duration
// ErrorBuffer stores a set of errors related to a peer. It contains error
// messages that our peer has recently sent us over the wire and records of
// unknown messages that were sent to us so that we can have a full track
// record of the communication errors we have had with our peer. If we
// choose to disconnect from a peer, it also stores the reason we had for
// disconnecting.
ErrorBuffer *queue.CircularBuffer
// WritePool is the task pool that manages reuse of write buffers. Write
// tasks are submitted to the pool in order to conserve the total number of
// write buffers allocated at any one time, and decouple write buffer
// allocation from the peer life cycle.
WritePool *pool.Write
// ReadPool is the task pool that manages reuse of read buffers.
ReadPool *pool.Read
// Switch is a pointer to the htlcswitch. It is used to setup, get, and
// tear-down ChannelLinks.
Switch *htlcswitch.Switch
// InterceptSwitch is a pointer to the InterceptableSwitch, a wrapper around
// the regular Switch. We only export it here to pass ForwardPackets to the
// ChannelLinkConfig.
InterceptSwitch *htlcswitch.InterceptableSwitch
// ChannelDB is used to fetch opened channels, and closed channels.
ChannelDB *channeldb.DB
// ChannelGraph is a pointer to the channel graph which is used to
// query information about the set of known active channels.
ChannelGraph *channeldb.ChannelGraph
// ChainArb is used to subscribe to channel events, update contract signals,
// and force close channels.
ChainArb *contractcourt.ChainArbitrator
// AuthGossiper is needed so that the Brontide impl can register with the
// gossiper and process remote channel announcements.
AuthGossiper *discovery.AuthenticatedGossiper
// ChanStatusMgr is used to set or un-set the disabled bit in channel
// updates.
ChanStatusMgr *netann.ChanStatusManager
// ChainIO is used to retrieve the best block.
ChainIO lnwallet.BlockChainIO
// FeeEstimator is used to compute our target ideal fee-per-kw when
// initializing the coop close process.
FeeEstimator chainfee.Estimator
// Signer is used when creating *lnwallet.LightningChannel instances.
Signer input.Signer
// SigPool is used when creating *lnwallet.LightningChannel instances.
SigPool *lnwallet.SigPool
// Wallet is used to publish transactions and generates delivery
// scripts during the coop close process.
Wallet *lnwallet.LightningWallet
// ChainNotifier is used to receive confirmations of a coop close
// transaction.
ChainNotifier chainntnfs.ChainNotifier
// RoutingPolicy is used to set the forwarding policy for links created by
// the Brontide.
RoutingPolicy htlcswitch.ForwardingPolicy
// Sphinx is used when setting up ChannelLinks so they can decode sphinx
// onion blobs.
Sphinx *hop.OnionProcessor
// WitnessBeacon is used when setting up ChannelLinks so they can add any
// preimages that they learn.
WitnessBeacon contractcourt.WitnessBeacon
// Invoices is passed to the ChannelLink on creation and handles all
// invoice-related logic.
Invoices *invoices.InvoiceRegistry
// ChannelNotifier is used by the link to notify other sub-systems about
// channel-related events and by the Brontide to subscribe to
// ActiveLinkEvents.
ChannelNotifier *channelnotifier.ChannelNotifier
// HtlcNotifier is used when creating a ChannelLink.
HtlcNotifier *htlcswitch.HtlcNotifier
// TowerClient is used by legacy channels to backup revoked states.
TowerClient wtclient.Client
// AnchorTowerClient is used by anchor channels to backup revoked
// states.
AnchorTowerClient wtclient.Client
// DisconnectPeer is used to disconnect this peer if the cooperative close
// process fails.
DisconnectPeer func(*btcec.PublicKey) error
// GenNodeAnnouncement is used to send our node announcement to the remote
// on startup.
GenNodeAnnouncement func(bool,
...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error)
// PrunePersistentPeerConnection is used to remove all internal state
// related to this peer in the server.
PrunePersistentPeerConnection func([33]byte)
// FetchLastChanUpdate fetches our latest channel update for a target
// channel.
FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate,
error)
// FundingManager is an implementation of the funding.Controller interface.
FundingManager funding.Controller
// Hodl is used when creating ChannelLinks to specify HodlFlags as
// breakpoints in dev builds.
Hodl *hodl.Config
// UnsafeReplay is used when creating ChannelLinks to specify whether or
// not to replay adds on its commitment tx.
UnsafeReplay bool
// MaxOutgoingCltvExpiry is used when creating ChannelLinks and is the max
// number of blocks that funds could be locked up for when forwarding
// payments.
MaxOutgoingCltvExpiry uint32
// MaxChannelFeeAllocation is used when creating ChannelLinks and is the
// maximum percentage of total funds that can be allocated to a channel's
// commitment fee. This only applies for the initiator of the channel.
MaxChannelFeeAllocation float64
// MaxAnchorsCommitFeeRate is the maximum fee rate we'll use as an
// initiator for anchor channel commitments.
MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
// ServerPubKey is the serialized, compressed public key of our lnd node.
// It is used to determine which policy (channel edge) to pass to the
// ChannelLink.
ServerPubKey [33]byte
// Quit is the server's quit channel. If this is closed, we halt operation.
Quit chan struct{}
}
// Brontide is an active peer on the Lightning Network. This struct is responsible
// for managing any channel state related to this peer. To do so, it has
// several helper goroutines to handle events such as HTLC timeouts, new
// funding workflow, and detecting an uncooperative closure of any active
// channels.
// TODO(roasbeef): proper reconnection logic
type Brontide struct {
// MUST be used atomically.
started int32
disconnect int32
// MUST be used atomically.
bytesReceived uint64
bytesSent uint64
// pingTime is a rough estimate of the RTT (round-trip-time) between us
// and the connected peer. This time is expressed in microseconds.
// To be used atomically.
// TODO(roasbeef): also use a WMA or EMA?
pingTime int64
// pingLastSend is the Unix time expressed in nanoseconds when we sent
// our last ping message. To be used atomically.
pingLastSend int64
cfg Config
// activeSignal when closed signals that the peer is now active and
// ready to process messages.
activeSignal chan struct{}
// startTime is the time this peer connection was successfully established.
// It will be zero for peers that did not successfully call Start().
startTime time.Time
// sendQueue is the channel which is used to queue outgoing messages to be
// written onto the wire. Note that this channel is unbuffered.
sendQueue chan outgoingMsg
// outgoingQueue is a buffered channel which allows second/third party
// objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoingMsg
// activeChanMtx protects access to the activeChannels and
// addedChannels maps.
activeChanMtx sync.RWMutex
// activeChannels is a map which stores the state machines of all
// active channels. Channels are indexed into the map by the txid of
// the funding transaction which opened the channel.
//
// NOTE: On startup, pending channels are stored as nil in this map.
// Confirmed channels have channel data populated in the map. This means
// that accesses to this map should nil-check the LightningChannel to
// see if this is a pending channel or not. The tradeoff here is either
// having two maps everywhere (one for pending, one for confirmed chans)
// or having an extra nil-check per access.
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
// addedChannels tracks any new channels opened during this peer's
// lifecycle. We use this to filter out these new channels when the time
// comes to request a reenable for active channels, since they will have
// waited a shorter duration.
addedChannels map[lnwire.ChannelID]struct{}
// newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow.
newChannels chan *newChannelMsg
// activeMsgStreams is a map from channel id to the channel streams that
// proxy messages to individual, active links.
activeMsgStreams map[lnwire.ChannelID]*msgStream
// activeChanCloses is a map that keeps track of all the active
// cooperative channel closures. Any channel closing messages are directed
// to one of these active state machines. Once the channel has been closed,
// the state machine will be deleted from the map.
activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser
// localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over.
localCloseChanReqs chan *htlcswitch.ChanClose
// linkFailures receives all reported channel failures from the switch,
// and instructs the channelManager to clean remaining channel state.
linkFailures chan linkFailureReport
// chanCloseMsgs is a channel that any message related to channel
// closures are sent over. This includes lnwire.Shutdown message as
// well as lnwire.ClosingSigned messages.
chanCloseMsgs chan *closeMsg
// remoteFeatures is the feature vector received from the peer during
// the connection handshake.
remoteFeatures *lnwire.FeatureVector
// resentChanSyncMsg is a set that keeps track of which channels we
// have re-sent channel reestablishment messages for. This is done to
// avoid getting into loop where both peers will respond to the other
// peer's chansync message with its own over and over again.
resentChanSyncMsg map[lnwire.ChannelID]struct{}
queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
}
// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer interface.
var _ lnpeer.Peer = (*Brontide)(nil)
// NewBrontide creates a new Brontide from a peer.Config struct.
func NewBrontide(cfg Config) *Brontide {
p := &Brontide{
cfg: cfg,
activeSignal: make(chan struct{}),
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: make(map[lnwire.ChannelID]struct{}),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg),
resentChanSyncMsg: make(map[lnwire.ChannelID]struct{}),
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
return p
}
// Start starts all helper goroutines the peer needs for normal operations. In
// the case this peer has already been started, then this function is a noop.
func (p *Brontide) Start() error {
if atomic.AddInt32(&p.started, 1) != 1 {
return nil
}
peerLog.Tracef("Peer %v starting", p)
// Fetch and then load all the active channels we have with this remote
// peer from the database.
activeChans, err := p.cfg.ChannelDB.FetchOpenChannels(
p.cfg.Addr.IdentityKey,
)
if err != nil {
peerLog.Errorf("Unable to fetch active chans "+
"for peer %v: %v", p, err)
return err
}
if len(activeChans) == 0 {
p.cfg.PrunePersistentPeerConnection(p.cfg.PubKeyBytes)
}
// Quickly check if we have any existing legacy channels with this
// peer.
haveLegacyChan := false
for _, c := range activeChans {
if c.ChanType.IsTweakless() {
continue
}
haveLegacyChan = true
break
}
// Exchange local and global features, the init message should be very
// first between two nodes.
if err := p.sendInitMsg(haveLegacyChan); err != nil {
return fmt.Errorf("unable to send init msg: %v", err)
}
// Before we launch any of the helper goroutines off the peer struct,
// we'll first ensure proper adherence to the p2p protocol. The init
// message MUST be sent before any other message.
readErr := make(chan error, 1)
msgChan := make(chan lnwire.Message, 1)
p.wg.Add(1)
go func() {
defer p.wg.Done()
msg, err := p.readNextMessage()
if err != nil {
readErr <- err
msgChan <- nil
return
}
readErr <- nil
msgChan <- msg
}()
select {
// In order to avoid blocking indefinitely, we'll give the other peer
// an upper timeout to respond before we bail out early.
case <-time.After(handshakeTimeout):
return fmt.Errorf("peer did not complete handshake within %v",
handshakeTimeout)
case err := <-readErr:
if err != nil {
return fmt.Errorf("unable to read init msg: %v", err)
}
}
// Once the init message arrives, we can parse it so we can figure out
// the negotiation of features for this session.
msg := <-msgChan
if msg, ok := msg.(*lnwire.Init); ok {
if err := p.handleInitMsg(msg); err != nil {
p.storeError(err)
return err
}
} else {
return errors.New("very first message between nodes " +
"must be init message")
}
// Next, load all the active channels we have with this peer,
// registering them with the switch and launching the necessary
// goroutines required to operate them.
peerLog.Debugf("Loaded %v active channels from database with "+
"NodeKey(%x)", len(activeChans), p.PubKey())
msgs, err := p.loadActiveChannels(activeChans)
if err != nil {
return fmt.Errorf("unable to load channels: %v", err)
}
p.startTime = time.Now()
p.wg.Add(5)
go p.queueHandler()
go p.writeHandler()
go p.readHandler()
go p.channelManager()
go p.pingHandler()
// Signal to any external processes that the peer is now active.
close(p.activeSignal)
// Now that the peer has started up, we send any channel sync messages
// that must be resent for borked channels.
if len(msgs) > 0 {
peerLog.Infof("Sending %d channel sync messages to peer after "+
"loading active channels", len(msgs))
if err := p.SendMessage(true, msgs...); err != nil {
peerLog.Warnf("Failed sending channel sync "+
"messages to peer %v: %v", p, err)
}
}
// Node announcements don't propagate very well throughout the network
// as there isn't a way to efficiently query for them through their
// timestamp, mostly affecting nodes that were offline during the time
// of broadcast. We'll resend our node announcement to the remote peer
// as a best-effort delivery such that it can also propagate to their
// peers. To ensure they can successfully process it in most cases,
// we'll only resend it as long as we have at least one confirmed
// advertised channel with the remote peer.
//
// TODO(wilmer): Remove this once we're able to query for node
// announcements through their timestamps.
p.maybeSendNodeAnn(activeChans)
return nil
}
// initGossipSync initializes either a gossip syncer or an initial routing
// dump, depending on the negotiated synchronization method.
func (p *Brontide) initGossipSync() {
// If the remote peer knows of the new gossip queries feature, then
// we'll create a new gossipSyncer in the AuthenticatedGossiper for it.
if p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional) {
peerLog.Infof("Negotiated chan series queries with %x",
p.cfg.PubKeyBytes[:])
// Register the peer's gossip syncer with the gossiper.
// This blocks synchronously to ensure the gossip syncer is
// registered with the gossiper before attempting to read
// messages from the remote peer.
//
// TODO(wilmer): Only sync updates from non-channel peers. This
// requires an improved version of the current network
// bootstrapper to ensure we can find and connect to non-channel
// peers.
p.cfg.AuthGossiper.InitSyncState(p)
}
}
// QuitSignal is a method that should return a channel which will be sent upon
// or closed once the backing peer exits. This allows callers using the
// interface to cancel any processing in the event the backing implementation
// exits.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *Brontide) QuitSignal() <-chan struct{} {
return p.quit
}
// loadActiveChannels creates indexes within the peer for tracking all active
// channels returned by the database. It returns a slice of channel reestablish
// messages that should be sent to the peer immediately, in case we have borked
// channels that haven't been closed yet.
func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
[]lnwire.Message, error) {
// Return a slice of messages to send to the peers in case the channel
// cannot be loaded normally.
var msgs []lnwire.Message
for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel(
p.cfg.Signer, dbChan, p.cfg.SigPool,
)
if err != nil {
return nil, err
}
chanPoint := &dbChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
peerLog.Infof("NodeKey(%x) loading ChannelPoint(%v)",
p.PubKey(), chanPoint)
// Skip adding any permanently irreconcilable channels to the
// htlcswitch.
if !dbChan.HasChanStatus(channeldb.ChanStatusDefault) &&
!dbChan.HasChanStatus(channeldb.ChanStatusRestored) {
peerLog.Warnf("ChannelPoint(%v) has status %v, won't "+
"start.", chanPoint, dbChan.ChanStatus())
// To help our peer recover from a potential data loss,
// we resend our channel reestablish message if the
// channel is in a borked state. We won't process any
// channel reestablish message sent from the peer, but
// that's okay since the assumption is that we did when
// marking the channel borked.
chanSync, err := dbChan.ChanSyncMsg()
if err != nil {
peerLog.Errorf("Unable to create channel "+
"reestablish message for channel %v: "+
"%v", chanPoint, err)
continue
}
msgs = append(msgs, chanSync)
continue
}
// Before we register this new link with the HTLC Switch, we'll
// need to fetch its current link-layer forwarding policy from
// the database.
graph := p.cfg.ChannelGraph
info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(chanPoint)
if err != nil && err != channeldb.ErrEdgeNotFound {
return nil, err
}
// We'll filter out our policy from the directional channel
// edges based whom the edge connects to. If it doesn't connect
// to us, then we know that we were the one that advertised the
// policy.
//
// TODO(roasbeef): can add helper method to get policy for
// particular channel.
var selfPolicy *channeldb.ChannelEdgePolicy
if info != nil && bytes.Equal(info.NodeKey1Bytes[:],
p.cfg.ServerPubKey[:]) {
selfPolicy = p1
} else {
selfPolicy = p2
}
// If we don't yet have an advertised routing policy, then
// we'll use the current default, otherwise we'll translate the
// routing policy into a forwarding policy.
var forwardingPolicy *htlcswitch.ForwardingPolicy
if selfPolicy != nil {
forwardingPolicy = &htlcswitch.ForwardingPolicy{
MinHTLCOut: selfPolicy.MinHTLC,
MaxHTLC: selfPolicy.MaxHTLC,
BaseFee: selfPolicy.FeeBaseMSat,
FeeRate: selfPolicy.FeeProportionalMillionths,
TimeLockDelta: uint32(selfPolicy.TimeLockDelta),
}
} else {
peerLog.Warnf("Unable to find our forwarding policy "+
"for channel %v, using default values",
chanPoint)
forwardingPolicy = &p.cfg.RoutingPolicy
}
peerLog.Tracef("Using link policy of: %v",
spew.Sdump(forwardingPolicy))
// If the channel is pending, set the value to nil in the
// activeChannels map. This is done to signify that the channel is
// pending. We don't add the link to the switch here - it's the funding
// manager's responsibility to spin up pending channels. Adding them
// here would just be extra work as we'll tear them down when creating
// + adding the final link.
if lnChan.IsPending() {
p.activeChanMtx.Lock()
p.activeChannels[chanID] = nil
p.activeChanMtx.Unlock()
continue
}
// Subscribe to the set of on-chain events for this channel.
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
return nil, err
}
err = p.addLink(
chanPoint, lnChan, forwardingPolicy, chainEvents,
true,
)
if err != nil {
return nil, fmt.Errorf("unable to add link %v to "+
"switch: %v", chanPoint, err)
}
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
}
return msgs, nil
}
// addLink creates and adds a new ChannelLink from the specified channel.
func (p *Brontide) addLink(chanPoint *wire.OutPoint,
lnChan *lnwallet.LightningChannel,
forwardingPolicy *htlcswitch.ForwardingPolicy,
chainEvents *contractcourt.ChainEventSubscription,
syncStates bool) error {
// onChannelFailure will be called by the link in case the channel
// fails for some reason.
onChannelFailure := func(chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID,
linkErr htlcswitch.LinkFailureError) {
failure := linkFailureReport{
chanPoint: *chanPoint,
chanID: chanID,
shortChanID: shortChanID,
linkErr: linkErr,
}
select {
case p.linkFailures <- failure:
case <-p.quit:
case <-p.cfg.Quit:
}
}
updateContractSignals := func(signals *contractcourt.ContractSignals) error {
return p.cfg.ChainArb.UpdateContractSignals(*chanPoint, signals)
}
chanType := lnChan.State().ChanType
// Select the appropriate tower client based on the channel type. It's
// okay if the clients are disabled altogether and these values are nil,
// as the link will check for nilness before using either.
var towerClient htlcswitch.TowerClient
if chanType.HasAnchors() {
towerClient = p.cfg.AnchorTowerClient
} else {
towerClient = p.cfg.TowerClient
}
linkCfg := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.cfg.Sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate,
HodlMask: p.cfg.Hodl.Mask(),
Registry: p.cfg.Invoices,
Switch: p.cfg.Switch,
Circuits: p.cfg.Switch.CircuitModifier(),
ForwardPackets: p.cfg.InterceptSwitch.ForwardPackets,
FwrdingPolicy: *forwardingPolicy,
FeeEstimator: p.cfg.FeeEstimator,
PreimageCache: p.cfg.WitnessBeacon,
ChainEvents: chainEvents,
UpdateContractSignals: updateContractSignals,
OnChannelFailure: onChannelFailure,
SyncStates: syncStates,
BatchTicker: ticker.New(50 * time.Millisecond),
FwdPkgGCTicker: ticker.New(time.Hour),
PendingCommitTicker: ticker.New(time.Minute),
BatchSize: 10,
UnsafeReplay: p.cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
OutgoingCltvRejectDelta: p.cfg.OutgoingCltvRejectDelta,
TowerClient: towerClient,
MaxOutgoingCltvExpiry: p.cfg.MaxOutgoingCltvExpiry,
MaxFeeAllocation: p.cfg.MaxChannelFeeAllocation,
MaxAnchorsCommitFeeRate: p.cfg.MaxAnchorsCommitFeeRate,
NotifyActiveLink: p.cfg.ChannelNotifier.NotifyActiveLinkEvent,
NotifyActiveChannel: p.cfg.ChannelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: p.cfg.ChannelNotifier.NotifyInactiveChannelEvent,
HtlcNotifier: p.cfg.HtlcNotifier,
}
link := htlcswitch.NewChannelLink(linkCfg, lnChan)
// Before adding our new link, purge the switch of any pending or live
// links going by the same channel id. If one is found, we'll shut it
// down to ensure that the mailboxes are only ever under the control of
// one link.
p.cfg.Switch.RemoveLink(link.ChanID())
// With the channel link created, we'll now notify the htlc switch so
// this channel can be used to dispatch local payments and also
// passively forward payments.
return p.cfg.Switch.AddLink(link)
}
// maybeSendNodeAnn sends our node announcement to the remote peer if at least
// one confirmed public channel exists with them.
func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
hasConfirmedPublicChan := false
for _, channel := range channels {
if channel.IsPending {
continue
}
if channel.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
continue
}
hasConfirmedPublicChan = true
break
}
if !hasConfirmedPublicChan {
return
}
ourNodeAnn, err := p.cfg.GenNodeAnnouncement(false)
if err != nil {
peerLog.Debugf("Unable to retrieve node announcement: %v", err)
return
}
if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil {
peerLog.Debugf("Unable to resend node announcement to %x: %v",
p.cfg.PubKeyBytes, err)
}
}
// WaitForDisconnect waits until the peer has disconnected. A peer may be
// disconnected if the local or remote side terminates the connection, or an
// irrecoverable protocol error has been encountered. This method will only
// begin watching the peer's waitgroup after the ready channel or the peer's
// quit channel are signaled. The ready channel should only be signaled if a
// call to Start returns no error. Otherwise, if the peer fails to start,
// calling Disconnect will signal the quit channel and the method will not
// block, since no goroutines were spawned.
func (p *Brontide) WaitForDisconnect(ready chan struct{}) {
select {
case <-ready:
case <-p.quit:
}
p.wg.Wait()
}
// Disconnect terminates the connection with the remote peer. Additionally, a
// signal is sent to the server and htlcSwitch indicating the resources
// allocated to the peer can now be cleaned up.
func (p *Brontide) Disconnect(reason error) {
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
return
}
err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
p.storeError(err)
peerLog.Infof(err.Error())
// Ensure that the TCP connection is properly closed before continuing.
p.cfg.Conn.Close()
close(p.quit)
}
// String returns the string representation of this peer.
func (p *Brontide) String() string {
return fmt.Sprintf("%x@%s", p.cfg.PubKeyBytes, p.cfg.Conn.RemoteAddr())
}
// readNextMessage reads, and returns the next message on the wire along with
// any additional raw payload.
func (p *Brontide) readNextMessage() (lnwire.Message, error) {
noiseConn := p.cfg.Conn
err := noiseConn.SetReadDeadline(time.Time{})
if err != nil {
return nil, err
}
pktLen, err := noiseConn.ReadNextHeader()
if err != nil {
return nil, err
}
// First we'll read the next _full_ message. We do this rather than
// reading incrementally from the stream as the Lightning wire protocol
// is message oriented and allows nodes to pad on additional data to
// the message stream.
var rawMsg []byte
err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error {
// Before reading the body of the message, set the read timeout
// accordingly to ensure we don't block other readers using the
// pool. We do so only after the task has been scheduled to
// ensure the deadline doesn't expire while the message is in
// the process of being scheduled.
readDeadline := time.Now().Add(readMessageTimeout)
readErr := noiseConn.SetReadDeadline(readDeadline)
if readErr != nil {
return readErr
}
rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen])
return readErr
})
atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg)))
if err != nil {
return nil, err
}
// Next, create a new io.Reader implementation from the raw message,
// and use this to decode the message directly from.
msgReader := bytes.NewReader(rawMsg)
nextMsg, err := lnwire.ReadMessage(msgReader, 0)
if err != nil {
return nil, err
}
p.logWireMessage(nextMsg, true)
return nextMsg, nil
}
// msgStream implements a goroutine-safe, in-order stream of messages to be
// delivered via closure to a receiver. These messages MUST be in order due to
// the nature of the lightning channel commitment and gossiper state machines.
// TODO(conner): use stream handler interface to abstract out stream
// state/logging
type msgStream struct {
streamShutdown int32 // To be used atomically.
peer *Brontide
apply func(lnwire.Message)
startMsg string
stopMsg string
msgCond *sync.Cond
msgs []lnwire.Message
mtx sync.Mutex
producerSema chan struct{}
wg sync.WaitGroup
quit chan struct{}
}
// newMsgStream creates a new instance of a chanMsgStream for a particular
// channel identified by its channel ID. bufSize is the max number of messages
// that should be buffered in the internal queue. Callers should set this to a
// sane value that avoids blocking unnecessarily, but doesn't allow an
// unbounded amount of memory to be allocated to buffer incoming messages.
func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32,
apply func(lnwire.Message)) *msgStream {