/
router.go
2224 lines (1902 loc) · 72.2 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package routing
import (
"bytes"
"crypto/sha256"
"fmt"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing/chainview"
)
const (
// DefaultFinalCLTVDelta is the default value to be used as the final
// CLTV delta for a route if one is unspecified.
DefaultFinalCLTVDelta = 9
// defaultPayAttemptTimeout is a duration that we'll use to determine
// if we should give up on a payment attempt. This will be used if a
// value isn't specified in the LightningNode struct.
defaultPayAttemptTimeout = time.Duration(time.Second * 60)
)
// ChannelGraphSource represents the source of information about the topology
// of the lightning network. It's responsible for the addition of nodes, edges,
// applying edge updates, and returning the current block height with which the
// topology is synchronized.
type ChannelGraphSource interface {
// AddNode is used to add information about a node to the router
// database. If the node with this pubkey is not present in an existing
// channel, it will be ignored.
AddNode(node *channeldb.LightningNode) error
// AddEdge is used to add edge/channel to the topology of the router,
// after all information about channel will be gathered this
// edge/channel might be used in construction of payment path.
AddEdge(edge *channeldb.ChannelEdgeInfo) error
// AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network.
AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error
// UpdateEdge is used to update edge information, without this message
// edge considered as not fully constructed.
UpdateEdge(policy *channeldb.ChannelEdgePolicy) error
// IsStaleNode returns true if the graph source has a node announcement
// for the target node with a more recent timestamp. This method will
// also return true if we don't have an active channel announcement for
// the target node.
IsStaleNode(node Vertex, timestamp time.Time) bool
// IsKnownEdge returns true if the graph source already knows of the
// passed channel ID.
IsKnownEdge(chanID lnwire.ShortChannelID) bool
// IsStaleEdgePolicy returns true if the graph source has a channel
// edge for the passed channel ID (and flags) that have a more recent
// timestamp.
IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time,
flags lnwire.ChanUpdateFlag) bool
// ForAllOutgoingChannels is used to iterate over all channels
// emanating from the "source" node which is the center of the
// star-graph.
ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgeInfo,
e *channeldb.ChannelEdgePolicy) error) error
// CurrentBlockHeight returns the block height from POV of the router
// subsystem.
CurrentBlockHeight() (uint32, error)
// GetChannelByID return the channel by the channel id.
GetChannelByID(chanID lnwire.ShortChannelID) (*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error)
// ForEachNode is used to iterate over every node in the known graph.
ForEachNode(func(node *channeldb.LightningNode) error) error
// ForEachChannel is used to iterate over every channel in the known
// graph.
ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error) error
}
// FeeSchema is the set fee configuration for a Lightning Node on the network.
// Using the coefficients described within the schema, the required fee to
// forward outgoing payments can be derived.
type FeeSchema struct {
// BaseFee is the base amount of milli-satoshis that will be chained
// for ANY payment forwarded.
BaseFee lnwire.MilliSatoshi
// FeeRate is the rate that will be charged for forwarding payments.
// This value should be interpreted as the numerator for a fraction
// (fixed point arithmetic) whose denominator is 1 million. As a result
// the effective fee rate charged per mSAT will be: (amount *
// FeeRate/1,000,000).
FeeRate uint32
}
// ChannelPolicy holds the parameters that determine the policy we enforce
// when forwarding payments on a channel. These parameters are communicated
// to the rest of the network in ChannelUpdate messages.
type ChannelPolicy struct {
// FeeSchema holds the fee configuration for a channel.
FeeSchema
// TimeLockDelta is the required HTLC timelock delta to be used
// when forwarding payments.
TimeLockDelta uint32
}
// Config defines the configuration for the ChannelRouter. ALL elements within
// the configuration MUST be non-nil for the ChannelRouter to carry out its
// duties.
type Config struct {
// Graph is the channel graph that the ChannelRouter will use to gather
// metrics from and also to carry out path finding queries.
// TODO(roasbeef): make into an interface
Graph *channeldb.ChannelGraph
// Chain is the router's source to the most up-to-date blockchain data.
// All incoming advertised channels will be checked against the chain
// to ensure that the channels advertised are still open.
Chain lnwallet.BlockChainIO
// ChainView is an instance of a FilteredChainView which is used to
// watch the sub-set of the UTXO set (the set of active channels) that
// we need in order to properly maintain the channel graph.
ChainView chainview.FilteredChainView
// SendToSwitch is a function that directs a link-layer switch to
// forward a fully encoded payment to the first hop in the route
// denoted by its public key. A non-nil error is to be returned if the
// payment was unsuccessful.
SendToSwitch func(firstHop lnwire.ShortChannelID,
htlcAdd *lnwire.UpdateAddHTLC,
circuit *sphinx.Circuit) ([sha256.Size]byte, error)
// ChannelPruneExpiry is the duration used to determine if a channel
// should be pruned or not. If the delta between now and when the
// channel was last updated is greater than ChannelPruneExpiry, then
// the channel is marked as a zombie channel eligible for pruning.
ChannelPruneExpiry time.Duration
// GraphPruneInterval is used as an interval to determine how often we
// should examine the channel graph to garbage collect zombie channels.
GraphPruneInterval time.Duration
// QueryBandwidth is a method that allows the router to query the lower
// link layer to determine the up to date available bandwidth at a
// prospective link to be traversed. If the link isn't available, then
// a value of zero should be returned. Otherwise, the current up to
// date knowledge of the available bandwidth of the link should be
// returned.
QueryBandwidth func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi
}
// routeTuple is an entry within the ChannelRouter's route cache. We cache
// prospective routes based on first the destination, and then the target
// amount. We required the target amount as that will influence the available
// set of paths for a payment.
type routeTuple struct {
amt lnwire.MilliSatoshi
dest [33]byte
}
// newRouteTuple creates a new route tuple from the target and amount.
func newRouteTuple(amt lnwire.MilliSatoshi, dest []byte) routeTuple {
r := routeTuple{
amt: amt,
}
copy(r.dest[:], dest)
return r
}
// ChannelRouter is the layer 3 router within the Lightning stack. Below the
// ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain
// itself. The primary role of the ChannelRouter is to respond to queries for
// potential routes that can support a payment amount, and also general graph
// reachability questions. The router will prune the channel graph
// automatically as new blocks are discovered which spend certain known funding
// outpoints, thereby closing their respective channels.
type ChannelRouter struct {
ntfnClientCounter uint64 // To be used atomically.
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
bestHeight uint32 // To be used atomically.
// cfg is a copy of the configuration struct that the ChannelRouter was
// initialized with.
cfg *Config
// selfNode is the center of the star-graph centered around the
// ChannelRouter. The ChannelRouter uses this node as a starting point
// when doing any path finding.
selfNode *channeldb.LightningNode
// routeCache is a map that caches the k-shortest paths from ourselves
// to a given target destination for a particular payment amount. This
// map is used as an optimization to speed up subsequent payments to a
// particular destination. This map will be cleared each time a new
// channel announcement is accepted, or a new block arrives that
// results in channels being closed.
//
// TODO(roasbeef): make LRU
routeCacheMtx sync.RWMutex
routeCache map[routeTuple][]*Route
// newBlocks is a channel in which new blocks connected to the end of
// the main chain are sent over, and blocks updated after a call to
// UpdateFilter.
newBlocks <-chan *chainview.FilteredBlock
// staleBlocks is a channel in which blocks disconnected fromt the end
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates
// messages from outside the ChannelRouter to be processed by the
// networkHandler.
networkUpdates chan *routingMsg
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients map[uint64]*topologyClient
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the ChannelRouter. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate
// missionControl is a shared memory of sorts that executions of
// payment path finding use in order to remember which vertexes/edges
// were pruned from prior attempts. During SendPayment execution,
// errors sent by nodes are mapped into a vertex or edge to be pruned.
// Each run will then take into account this set of pruned
// vertexes/edges to reduce route failure and pass on graph information
// gained to the next execution.
missionControl *missionControl
// channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses.
channelEdgeMtx *multimutex.Mutex
rejectMtx sync.RWMutex
rejectCache map[uint64]struct{}
sync.RWMutex
quit chan struct{}
wg sync.WaitGroup
}
// A compile time check to ensure ChannelRouter implements the
// ChannelGraphSource interface.
var _ ChannelGraphSource = (*ChannelRouter)(nil)
// New creates a new instance of the ChannelRouter with the specified
// configuration parameters. As part of initialization, if the router detects
// that the channel graph isn't fully in sync with the latest UTXO (since the
// channel graph is a subset of the UTXO set) set, then the router will proceed
// to fully sync to the latest state of the UTXO set.
func New(cfg Config) (*ChannelRouter, error) {
selfNode, err := cfg.Graph.SourceNode()
if err != nil {
return nil, err
}
r := &ChannelRouter{
cfg: &cfg,
networkUpdates: make(chan *routingMsg),
topologyClients: make(map[uint64]*topologyClient),
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex(),
selfNode: selfNode,
routeCache: make(map[routeTuple][]*Route),
rejectCache: make(map[uint64]struct{}),
quit: make(chan struct{}),
}
r.missionControl = newMissionControl(
cfg.Graph, selfNode, cfg.QueryBandwidth,
)
return r, nil
}
// Start launches all the goroutines the ChannelRouter requires to carry out
// its duties. If the router has already been started, then this method is a
// noop.
func (r *ChannelRouter) Start() error {
if !atomic.CompareAndSwapUint32(&r.started, 0, 1) {
return nil
}
log.Tracef("Channel Router starting")
// First, we'll start the chain view instance (if it isn't already
// started).
if err := r.cfg.ChainView.Start(); err != nil {
return err
}
// Once the instance is active, we'll fetch the channel we'll receive
// notifications over.
r.newBlocks = r.cfg.ChainView.FilteredBlocks()
r.staleBlocks = r.cfg.ChainView.DisconnectedBlocks()
bestHash, bestHeight, err := r.cfg.Chain.GetBestBlock()
if err != nil {
return err
}
if _, _, err := r.cfg.Graph.PruneTip(); err != nil {
switch {
// If the graph has never been pruned, or hasn't fully been
// created yet, then we don't treat this as an explicit error.
case err == channeldb.ErrGraphNeverPruned:
fallthrough
case err == channeldb.ErrGraphNotFound:
// If the graph has never been pruned, then we'll set
// the prune height to the current best height of the
// chain backend.
_, err = r.cfg.Graph.PruneGraph(
nil, bestHash, uint32(bestHeight),
)
if err != nil {
return err
}
default:
return err
}
}
// Before we perform our manual block pruning, we'll construct and
// apply a fresh chain filter to the active FilteredChainView instance.
// We do this before, as otherwise we may miss on-chain events as the
// filter hasn't properly been applied.
channelView, err := r.cfg.Graph.ChannelView()
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return err
}
log.Infof("Filtering chain using %v channels active", len(channelView))
if len(channelView) != 0 {
err = r.cfg.ChainView.UpdateFilter(
channelView, uint32(bestHeight),
)
if err != nil {
return err
}
}
// Before we begin normal operation of the router, we first need to
// synchronize the channel graph to the latest state of the UTXO set.
if err := r.syncGraphWithChain(); err != nil {
return err
}
r.wg.Add(1)
go r.networkHandler()
return nil
}
// Stop signals the ChannelRouter to gracefully halt all routines. This method
// will *block* until all goroutines have excited. If the channel router has
// already stopped then this method will return immediately.
func (r *ChannelRouter) Stop() error {
if !atomic.CompareAndSwapUint32(&r.stopped, 0, 1) {
return nil
}
log.Infof("Channel Router shutting down")
if err := r.cfg.ChainView.Stop(); err != nil {
return err
}
close(r.quit)
r.wg.Wait()
return nil
}
// syncGraphWithChain attempts to synchronize the current channel graph with
// the latest UTXO set state. This process involves pruning from the channel
// graph any channels which have been closed by spending their funding output
// since we've been down.
func (r *ChannelRouter) syncGraphWithChain() error {
// First, we'll need to check to see if we're already in sync with the
// latest state of the UTXO set.
bestHash, bestHeight, err := r.cfg.Chain.GetBestBlock()
if err != nil {
return err
}
r.bestHeight = uint32(bestHeight)
pruneHash, pruneHeight, err := r.cfg.Graph.PruneTip()
if err != nil {
switch {
// If the graph has never been pruned, or hasn't fully been
// created yet, then we don't treat this as an explicit error.
case err == channeldb.ErrGraphNeverPruned:
case err == channeldb.ErrGraphNotFound:
default:
return err
}
}
log.Infof("Prune tip for Channel Graph: height=%v, hash=%v", pruneHeight,
pruneHash)
switch {
// If the graph has never been pruned, then we can exit early as this
// entails it's being created for the first time and hasn't seen any
// block or created channels.
case pruneHeight == 0 || pruneHash == nil:
return nil
// If the block hashes and heights match exactly, then we don't need to
// prune the channel graph as we're already fully in sync.
case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
return nil
}
// If the main chain blockhash at prune height is different from the
// prune hash, this might indicate the database is on a stale branch.
mainBlockHash, err := r.cfg.Chain.GetBlockHash(int64(pruneHeight))
if err != nil {
return err
}
// While we are on a stale branch of the chain, walk backwards to find
// first common block.
for !pruneHash.IsEqual(mainBlockHash) {
log.Infof("channel graph is stale. Disconnecting block %v "+
"(hash=%v)", pruneHeight, pruneHash)
// Prune the graph for every channel that was opened at height
// >= pruneHeight.
_, err := r.cfg.Graph.DisconnectBlockAtHeight(pruneHeight)
if err != nil {
return err
}
pruneHash, pruneHeight, err = r.cfg.Graph.PruneTip()
if err != nil {
switch {
// If at this point the graph has never been pruned, we
// can exit as this entails we are back to the point
// where it hasn't seen any block or created channels,
// alas there's nothing left to prune.
case err == channeldb.ErrGraphNeverPruned:
return nil
case err == channeldb.ErrGraphNotFound:
return nil
default:
return err
}
}
mainBlockHash, err = r.cfg.Chain.GetBlockHash(int64(pruneHeight))
if err != nil {
return err
}
}
log.Infof("Syncing channel graph from height=%v (hash=%v) to height=%v "+
"(hash=%v)", pruneHeight, pruneHash, bestHeight, bestHash)
// If we're not yet caught up, then we'll walk forward in the chain in
// the chain pruning the channel graph with each new block in the chain
// that hasn't yet been consumed by the channel graph.
var numChansClosed uint32
for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ {
// Using the next height, request a manual block pruning from
// the chainview for the particular block hash.
nextHash, err := r.cfg.Chain.GetBlockHash(int64(nextHeight))
if err != nil {
return err
}
filterBlock, err := r.cfg.ChainView.FilterBlock(nextHash)
if err != nil {
return err
}
// We're only interested in all prior outputs that have been
// spent in the block, so collate all the referenced previous
// outpoints within each tx and input.
var spentOutputs []*wire.OutPoint
for _, tx := range filterBlock.Transactions {
for _, txIn := range tx.TxIn {
spentOutputs = append(spentOutputs,
&txIn.PreviousOutPoint)
}
}
// With the spent outputs gathered, attempt to prune the
// channel graph, also passing in the hash+height of the block
// being pruned so the prune tip can be updated.
closedChans, err := r.cfg.Graph.PruneGraph(spentOutputs,
nextHash,
nextHeight)
if err != nil {
return err
}
numClosed := uint32(len(closedChans))
log.Infof("Block %v (height=%v) closed %v channels",
nextHash, nextHeight, numClosed)
numChansClosed += numClosed
}
log.Infof("Graph pruning complete: %v channels were closed since "+
"height %v", numChansClosed, pruneHeight)
return nil
}
// pruneZombieChans is a method that will be called periodically to prune out
// any "zombie" channels. We consider channels zombies if *both* edges haven't
// been updated since our zombie horizon. We do this periodically to keep a
// health, lively routing table.
func (r *ChannelRouter) pruneZombieChans() error {
var chansToPrune []wire.OutPoint
chanExpiry := r.cfg.ChannelPruneExpiry
log.Infof("Examining Channel Graph for zombie channels")
// First, we'll collect all the channels which are eligible for garbage
// collection due to being zombies.
filterPruneChans := func(info *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// We'll ensure that we don't attempt to prune our *own*
// channels from the graph, as in any case this should be
// re-advertised by the sub-system above us.
if info.NodeKey1Bytes == r.selfNode.PubKeyBytes ||
info.NodeKey2Bytes == r.selfNode.PubKeyBytes {
return nil
}
// If *both* edges haven't been updated for a period of
// chanExpiry, then we'll mark the channel itself as eligible
// for graph pruning.
e1Zombie, e2Zombie := true, true
if e1 != nil {
e1Zombie = time.Since(e1.LastUpdate) >= chanExpiry
if e1Zombie {
log.Tracef("Edge #1 of ChannelPoint(%v) "+
"last update: %v",
info.ChannelPoint, e1.LastUpdate)
}
}
if e2 != nil {
e2Zombie = time.Since(e2.LastUpdate) >= chanExpiry
if e2Zombie {
log.Tracef("Edge #2 of ChannelPoint(%v) "+
"last update: %v",
info.ChannelPoint, e2.LastUpdate)
}
}
if e1Zombie && e2Zombie {
log.Debugf("ChannelPoint(%v) is a zombie, collecting "+
"to prune", info.ChannelPoint)
// TODO(roasbeef): add ability to delete single
// directional edge
chansToPrune = append(chansToPrune, info.ChannelPoint)
// As we're detecting this as a zombie channel, we'll
// add this to the set of recently rejected items so we
// don't re-accept it shortly after.
r.rejectCache[info.ChannelID] = struct{}{}
}
return nil
}
r.rejectMtx.Lock()
defer r.rejectMtx.Unlock()
err := r.cfg.Graph.ForEachChannel(filterPruneChans)
if err != nil {
return fmt.Errorf("Unable to filter local zombie "+
"chans: %v", err)
}
log.Infof("Pruning %v Zombie Channels", len(chansToPrune))
// With the set zombie-like channels obtained, we'll do another pass to
// delete al zombie channels from the channel graph.
for _, chanToPrune := range chansToPrune {
log.Tracef("Pruning zombie chan ChannelPoint(%v)", chanToPrune)
err := r.cfg.Graph.DeleteChannelEdge(&chanToPrune)
if err != nil {
return fmt.Errorf("Unable to prune zombie "+
"chans: %v", err)
}
}
return nil
}
// networkHandler is the primary goroutine for the ChannelRouter. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network
// updates, and registering new topology clients.
//
// NOTE: This MUST be run as a goroutine.
func (r *ChannelRouter) networkHandler() {
defer r.wg.Done()
graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval)
defer graphPruneTicker.Stop()
// We'll use this validation barrier to ensure that we process all jobs
// in the proper order during parallel validation.
validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit)
for {
select {
// A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-r.networkUpdates:
// We'll set up any dependants, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(update.msg)
r.wg.Add(1)
go func() {
defer r.wg.Done()
defer validationBarrier.CompleteJob()
// If this message has an existing dependency,
// then we'll wait until that has been fully
// validated before we proceed.
err := validationBarrier.WaitForDependants(
update.msg,
)
if err != nil {
if err != ErrVBarrierShuttingDown {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
return
}
// Process the routing update to determine if
// this is either a new update from our PoV or
// an update to a prior vertex/edge we
// previously accepted.
err = r.processUpdate(update.msg)
update.err <- err
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(update.msg)
if err != nil {
return
}
// Send off a new notification for the newly
// accepted update.
topChange := &TopologyChange{}
err = addToTopologyChange(
r.cfg.Graph, topChange, update.msg,
)
if err != nil {
log.Errorf("unable to update topology "+
"change notification: %v", err)
return
}
if !topChange.isEmpty() {
r.notifyTopologyChange(topChange)
}
}()
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
case chainUpdate, ok := <-r.staleBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
if !ok {
return
}
// Since this block is stale, we update our best height
// to the previous block.
blockHeight := uint32(chainUpdate.Height)
atomic.StoreUint32(&r.bestHeight, blockHeight-1)
// Update the channel graph to reflect that this block
// was disconnected.
_, err := r.cfg.Graph.DisconnectBlockAtHeight(blockHeight)
if err != nil {
log.Errorf("unable to prune graph with stale "+
"block: %v", err)
continue
}
// Invalidate the route cache, as some channels might
// not be confirmed anymore.
r.routeCacheMtx.Lock()
r.routeCache = make(map[routeTuple][]*Route)
r.routeCacheMtx.Unlock()
// TODO(halseth): notify client about the reorg?
// A new block has arrived, so we can prune the channel graph
// of any channels which were closed in the block.
case chainUpdate, ok := <-r.newBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
if !ok {
return
}
// We'll ensure that any new blocks received attach
// directly to the end of our main chain. If not, then
// we've somehow missed some blocks. We don't process
// this block as otherwise, we may miss on-chain
// events.
currentHeight := atomic.LoadUint32(&r.bestHeight)
if chainUpdate.Height != currentHeight+1 {
log.Errorf("out of order block: expecting "+
"height=%v, got height=%v", currentHeight+1,
chainUpdate.Height)
continue
}
// Once a new block arrives, we update our running
// track of the height of the chain tip.
blockHeight := uint32(chainUpdate.Height)
atomic.StoreUint32(&r.bestHeight, blockHeight)
log.Infof("Pruning channel graph using block %v (height=%v)",
chainUpdate.Hash, blockHeight)
// We're only interested in all prior outputs that have
// been spent in the block, so collate all the
// referenced previous outpoints within each tx and
// input.
var spentOutputs []*wire.OutPoint
for _, tx := range chainUpdate.Transactions {
for _, txIn := range tx.TxIn {
spentOutputs = append(spentOutputs,
&txIn.PreviousOutPoint)
}
}
// With the spent outputs gathered, attempt to prune
// the channel graph, also passing in the hash+height
// of the block being pruned so the prune tip can be
// updated.
chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
&chainUpdate.Hash, chainUpdate.Height)
if err != nil {
log.Errorf("unable to prune routing table: %v", err)
continue
}
log.Infof("Block %v (height=%v) closed %v channels",
chainUpdate.Hash, blockHeight, len(chansClosed))
// Invalidate the route cache as the block height has
// changed which will invalidate the HTLC timeouts we
// have crafted within each of the pre-computed routes.
//
// TODO(roasbeef): need to invalidate after each
// chan ann update?
// * can have map of chanID to routes involved, avoids
// full invalidation
r.routeCacheMtx.Lock()
r.routeCache = make(map[routeTuple][]*Route)
r.routeCacheMtx.Unlock()
if len(chansClosed) == 0 {
continue
}
// Notify all currently registered clients of the newly
// closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
r.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-r.ntfnClientUpdates:
clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel {
r.RLock()
client, ok := r.topologyClients[ntfnUpdate.clientID]
r.RUnlock()
if ok {
r.Lock()
delete(r.topologyClients, clientID)
r.Unlock()
close(client.exit)
client.wg.Wait()
close(client.ntfnChan)
}
continue
}
r.Lock()
r.topologyClients[ntfnUpdate.clientID] = &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
}
r.Unlock()
// The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels
// for pruning.
case <-graphPruneTicker.C:
if err := r.pruneZombieChans(); err != nil {
log.Errorf("unable to prune zombies: %v", err)
}
// The router has been signalled to exit, to we exit our main
// loop so the wait group can be decremented.
case <-r.quit:
return
}
}
}
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
// the database for the passed node with a timestamp newer than the passed
// timestamp. ErrIgnored will be returned if we already have the node, and
// ErrOutdated will be returned if we have a timestamp that's after the new
// timestamp.
func (r *ChannelRouter) assertNodeAnnFreshness(node Vertex,
msgTimestamp time.Time) error {
// If we are not already aware of this node, it means that we don't
// know about any channel using this node. To avoid a DoS attack by
// node announcements, we will ignore such nodes. If we do know about
// this node, check that this update brings info newer than what we
// already have.
lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(node)
if err != nil {
return errors.Errorf("unable to query for the "+
"existence of node: %v", err)
}
if !exists {
return newErrf(ErrIgnored, "Ignoring node announcement"+
" for node not found in channel graph (%x)",
node[:])
}
// If we've reached this point then we're aware of the vertex being
// advertised. So we now check if the new message has a new time stamp,
// if not then we won't accept the new data as it would override newer
// data.
if !lastUpdate.Before(msgTimestamp) {
return newErrf(ErrOutdated, "Ignoring outdated "+
"announcement for %x", node[:])
}
return nil
}
// processUpdate processes a new relate authenticated channel/edge, node or
// channel/edge update network update. If the update didn't affect the internal
// state of the draft due to either being out of date, invalid, or redundant,
// then error is returned.
func (r *ChannelRouter) processUpdate(msg interface{}) error {
var invalidateCache bool
switch msg := msg.(type) {
case *channeldb.LightningNode:
// Before we add the node to the database, we'll check to see
// if the announcement is "fresh" or not. If it isn't, then
// we'll return an error.
err := r.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate)
if err != nil {
return err
}
if err := r.cfg.Graph.AddLightningNode(msg); err != nil {
return errors.Errorf("unable to add node %v to the "+
"graph: %v", msg.PubKeyBytes, err)
}
log.Infof("Updated vertex data for node=%x", msg.PubKeyBytes)
case *channeldb.ChannelEdgeInfo:
// If we recently rejected this channel edge, then we won't
// attempt to re-process it.
r.rejectMtx.RLock()
if _, ok := r.rejectCache[msg.ChannelID]; ok {
r.rejectMtx.RUnlock()
return newErrf(ErrIgnored, "recently rejected "+
"chan_id=%v", msg.ChannelID)
}
r.rejectMtx.RUnlock()
// Prior to processing the announcement we first check if we
// already know of this channel, if so, then we can exit early.
_, _, exists, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID)
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return errors.Errorf("unable to check for edge "+
"existence: %v", err)
} else if exists {
return newErrf(ErrIgnored, "Ignoring msg for known "+
"chan_id=%v", msg.ChannelID)
}
// Query the database for the existence of the two nodes in this
// channel. If not found, add a partial node to the database,
// containing only the node keys.
_, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey1Bytes)
if !exists {
node1 := &channeldb.LightningNode{
PubKeyBytes: msg.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := r.cfg.Graph.AddLightningNode(node1)
if err != nil {
return errors.Errorf("unable to add node %v to"+
" the graph: %v", node1.PubKeyBytes, err)
}
}
_, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey2Bytes)
if !exists {
node2 := &channeldb.LightningNode{
PubKeyBytes: msg.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := r.cfg.Graph.AddLightningNode(node2)
if err != nil {
return errors.Errorf("unable to add node %v to"+
" the graph: %v", node2.PubKeyBytes, err)
}
}
// Before we can add the channel to the channel graph, we need
// to obtain the full funding outpoint that's encoded within
// the channel ID.
channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
fundingPoint, err := r.fetchChanPoint(&channelID)
if err != nil {
r.rejectMtx.Lock()
r.rejectCache[msg.ChannelID] = struct{}{}
r.rejectMtx.Unlock()
return errors.Errorf("unable to fetch chan point for "+
"chan_id=%v: %v", msg.ChannelID, err)
}
// Now that we have the funding outpoint of the channel, ensure
// that it hasn't yet been spent. If so, then this channel has
// been closed so we'll ignore it.
chanUtxo, err := r.cfg.Chain.GetUtxo(
fundingPoint, channelID.BlockHeight,
)
if err != nil {
r.rejectMtx.Lock()