-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
channel_arbitrator.go
2891 lines (2461 loc) · 92.9 KB
/
channel_arbitrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package contractcourt
import (
"bytes"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/sweep"
)
var (
// errAlreadyForceClosed is an error returned when we attempt to force
// close a channel that's already in the process of doing so.
errAlreadyForceClosed = errors.New("channel is already in the " +
"process of being force closed")
)
const (
// anchorSweepConfTarget is the conf target used when sweeping
// commitment anchors. This value is only used when the commitment
// transaction has no valid HTLCs for determining a confirmation
// deadline.
anchorSweepConfTarget = 144
// arbitratorBlockBufferSize is the size of the buffer we give to each
// channel arbitrator.
arbitratorBlockBufferSize = 20
)
// WitnessSubscription represents an intent to be notified once new witnesses
// are discovered by various active contract resolvers. A contract resolver may
// use this to be notified of when it can satisfy an incoming contract after we
// discover the witness for an outgoing contract.
type WitnessSubscription struct {
// WitnessUpdates is a channel that newly discovered witnesses will be
// sent over.
//
// TODO(roasbeef): couple with WitnessType?
WitnessUpdates <-chan lntypes.Preimage
// CancelSubscription is a function closure that should be used by a
// client to cancel the subscription once they are no longer interested
// in receiving new updates.
CancelSubscription func()
}
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
// this interface to lookup witnesses (preimages typically) of contracts
// they're trying to resolve, add new preimages they resolve, and finally
// receive new updates each new time a preimage is discovered.
//
// TODO(roasbeef): need to delete the pre-images once we've used them
// and have been sufficiently confirmed?
type WitnessBeacon interface {
// SubscribeUpdates returns a channel that will be sent upon *each* time
// a new preimage is discovered.
SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*WitnessSubscription, error)
// LookupPreImage attempts to lookup a preimage in the global cache.
// True is returned for the second argument if the preimage is found.
LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool)
// AddPreimages adds a batch of newly discovered preimages to the global
// cache, and also signals any subscribers of the newly discovered
// witness.
AddPreimages(preimages ...lntypes.Preimage) error
}
// ArbChannel is an abstraction that allows the channel arbitrator to interact
// with an open channel.
type ArbChannel interface {
// ForceCloseChan should force close the contract that this attendant
// is watching over. We'll use this when we decide that we need to go
// to chain. It should in addition tell the switch to remove the
// corresponding link, such that we won't accept any new updates. The
// returned summary contains all items needed to eventually resolve all
// outputs on chain.
ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error)
// NewAnchorResolutions returns the anchor resolutions for currently
// valid commitment transactions.
NewAnchorResolutions() (*lnwallet.AnchorResolutions, error)
}
// ChannelArbitratorConfig contains all the functionality that the
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
// on chain.
type ChannelArbitratorConfig struct {
// ChanPoint is the channel point that uniquely identifies this
// channel.
ChanPoint wire.OutPoint
// Channel is the full channel data structure. For legacy channels, this
// field may not always be set after a restart.
Channel ArbChannel
// ShortChanID describes the exact location of the channel within the
// chain. We'll use this to address any messages that we need to send
// to the switch during contract resolution.
ShortChanID lnwire.ShortChannelID
// ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this
// channel.
ChainEvents *ChainEventSubscription
// MarkCommitmentBroadcasted should mark the channel as the commitment
// being broadcast, and we are waiting for the commitment to confirm.
MarkCommitmentBroadcasted func(*wire.MsgTx, bool) error
// MarkChannelClosed marks the channel closed in the database, with the
// passed close summary. After this method successfully returns we can
// no longer expect to receive chain events for this channel, and must
// be able to recover from a failure without getting the close event
// again. It takes an optional channel status which will update the
// channel status in the record that we keep of historical channels.
MarkChannelClosed func(*channeldb.ChannelCloseSummary,
...channeldb.ChannelStatus) error
// IsPendingClose is a boolean indicating whether the channel is marked
// as pending close in the database.
IsPendingClose bool
// ClosingHeight is the height at which the channel was closed. Note
// that this value is only valid if IsPendingClose is true.
ClosingHeight uint32
// CloseType is the type of the close event in case IsPendingClose is
// true. Otherwise this value is unset.
CloseType channeldb.ClosureType
// MarkChannelResolved is a function closure that serves to mark a
// channel as "fully resolved". A channel itself can be considered
// fully resolved once all active contracts have individually been
// fully resolved.
//
// TODO(roasbeef): need RPC's to combine for pendingchannels RPC
MarkChannelResolved func() error
// PutResolverReport records a resolver report for the channel. If the
// transaction provided is nil, the function should write the report
// in a new transaction.
PutResolverReport func(tx kvdb.RwTx,
report *channeldb.ResolverReport) error
// FetchHistoricalChannel retrieves the historical state of a channel.
// This is mostly used to supplement the ContractResolvers with
// additional information required for proper contract resolution.
FetchHistoricalChannel func() (*channeldb.OpenChannel, error)
ChainArbitratorConfig
}
// ReportOutputType describes the type of output that is being reported
// on.
type ReportOutputType uint8
const (
// ReportOutputIncomingHtlc is an incoming hash time locked contract on
// the commitment tx.
ReportOutputIncomingHtlc ReportOutputType = iota
// ReportOutputOutgoingHtlc is an outgoing hash time locked contract on
// the commitment tx.
ReportOutputOutgoingHtlc
// ReportOutputUnencumbered is an uncontested output on the commitment
// transaction paying to us directly.
ReportOutputUnencumbered
// ReportOutputAnchor is an anchor output on the commitment tx.
ReportOutputAnchor
)
// ContractReport provides a summary of a commitment tx output.
type ContractReport struct {
// Outpoint is the final output that will be swept back to the wallet.
Outpoint wire.OutPoint
// Type indicates the type of the reported output.
Type ReportOutputType
// Amount is the final value that will be swept in back to the wallet.
Amount btcutil.Amount
// MaturityHeight is the absolute block height that this output will
// mature at.
MaturityHeight uint32
// Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
// the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
// to its expiry height, while a stage 2 htlc's maturity height will be
// set to its confirmation height plus the maturity requirement.
Stage uint32
// LimboBalance is the total number of frozen coins within this
// contract.
LimboBalance btcutil.Amount
// RecoveredBalance is the total value that has been successfully swept
// back to the user's wallet.
RecoveredBalance btcutil.Amount
}
// resolverReport creates a resolve report using some of the information in the
// contract report.
func (c *ContractReport) resolverReport(spendTx *chainhash.Hash,
resolverType channeldb.ResolverType,
outcome channeldb.ResolverOutcome) *channeldb.ResolverReport {
return &channeldb.ResolverReport{
OutPoint: c.Outpoint,
Amount: c.Amount,
ResolverType: resolverType,
ResolverOutcome: outcome,
SpendTxID: spendTx,
}
}
// htlcSet represents the set of active HTLCs on a given commitment
// transaction.
type htlcSet struct {
// incomingHTLCs is a map of all incoming HTLCs on the target
// commitment transaction. We may potentially go onchain to claim the
// funds sent to us within this set.
incomingHTLCs map[uint64]channeldb.HTLC
// outgoingHTLCs is a map of all outgoing HTLCs on the target
// commitment transaction. We may potentially go onchain to reclaim the
// funds that are currently in limbo.
outgoingHTLCs map[uint64]channeldb.HTLC
}
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
outHTLCs := make(map[uint64]channeldb.HTLC)
inHTLCs := make(map[uint64]channeldb.HTLC)
for _, htlc := range htlcs {
if htlc.Incoming {
inHTLCs[htlc.HtlcIndex] = htlc
continue
}
outHTLCs[htlc.HtlcIndex] = htlc
}
return htlcSet{
incomingHTLCs: inHTLCs,
outgoingHTLCs: outHTLCs,
}
}
// HtlcSetKey is a two-tuple that uniquely identifies a set of HTLCs on a
// commitment transaction.
type HtlcSetKey struct {
// IsRemote denotes if the HTLCs are on the remote commitment
// transaction.
IsRemote bool
// IsPending denotes if the commitment transaction that HTLCS are on
// are pending (the higher of two unrevoked commitments).
IsPending bool
}
var (
// LocalHtlcSet is the HtlcSetKey used for local commitments.
LocalHtlcSet = HtlcSetKey{IsRemote: false, IsPending: false}
// RemoteHtlcSet is the HtlcSetKey used for remote commitments.
RemoteHtlcSet = HtlcSetKey{IsRemote: true, IsPending: false}
// RemotePendingHtlcSet is the HtlcSetKey used for dangling remote
// commitment transactions.
RemotePendingHtlcSet = HtlcSetKey{IsRemote: true, IsPending: true}
)
// String returns a human readable string describing the target HtlcSetKey.
func (h HtlcSetKey) String() string {
switch h {
case LocalHtlcSet:
return "LocalHtlcSet"
case RemoteHtlcSet:
return "RemoteHtlcSet"
case RemotePendingHtlcSet:
return "RemotePendingHtlcSet"
default:
return "unknown HtlcSetKey"
}
}
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
// struct will keep in sync with the current set of HTLCs on the commitment
// transaction. The job of the attendant is to go on-chain to either settle or
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
// ChannelArbitrator will factor in an expected confirmation delta when
// broadcasting to ensure that we avoid any possibility of race conditions, and
// sweep the output(s) without contest.
type ChannelArbitrator struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
// startTimestamp is the time when this ChannelArbitrator was started.
startTimestamp time.Time
// log is a persistent log that the attendant will use to checkpoint
// its next action, and the state of any unresolved contracts.
log ArbitratorLog
// activeHTLCs is the set of active incoming/outgoing HTLC's on all
// currently valid commitment transactions.
activeHTLCs map[HtlcSetKey]htlcSet
// unmergedSet is used to update the activeHTLCs map in two callsites:
// checkLocalChainActions and sweepAnchors. It contains the latest
// updates from the link. It is not deleted from, its entries may be
// replaced on subsequent calls to notifyContractUpdate.
unmergedSet map[HtlcSetKey]htlcSet
unmergedMtx sync.RWMutex
// cfg contains all the functionality that the ChannelArbitrator requires
// to do its duty.
cfg ChannelArbitratorConfig
// blocks is a channel that the arbitrator will receive new blocks on.
// This channel should be buffered by so that it does not block the
// sender.
blocks chan int32
// signalUpdates is a channel that any new live signals for the channel
// we're watching over will be sent.
signalUpdates chan *signalUpdateMsg
// activeResolvers is a slice of any active resolvers. This is used to
// be able to signal them for shutdown in the case that we shutdown.
activeResolvers []ContractResolver
// activeResolversLock prevents simultaneous read and write to the
// resolvers slice.
activeResolversLock sync.RWMutex
// resolutionSignal is a channel that will be sent upon by contract
// resolvers once their contract has been fully resolved. With each
// send, we'll check to see if the contract is fully resolved.
resolutionSignal chan struct{}
// forceCloseReqs is a channel that requests to forcibly close the
// contract will be sent over.
forceCloseReqs chan *forceCloseReq
// state is the current state of the arbitrator. This state is examined
// upon start up to decide which actions to take.
state ArbitratorState
wg sync.WaitGroup
quit chan struct{}
}
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
// the passed config struct.
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
// Create a new map for unmerged HTLC's as we will overwrite the values
// and want to avoid modifying activeHTLCs directly. This soft copying
// is done to ensure that activeHTLCs isn't reset as an empty map later
// on.
unmerged := make(map[HtlcSetKey]htlcSet)
unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]
// If the pending set exists, write that as well.
if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
}
return &ChannelArbitrator{
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
activeHTLCs: htlcSets,
unmergedSet: unmerged,
cfg: cfg,
quit: make(chan struct{}),
}
}
// chanArbStartState contains the information from disk that we need to start
// up a channel arbitrator.
type chanArbStartState struct {
currentState ArbitratorState
commitSet *CommitSet
}
// getStartState retrieves the information from disk that our channel arbitrator
// requires to start.
func (c *ChannelArbitrator) getStartState(tx kvdb.RTx) (*chanArbStartState,
error) {
// First, we'll read our last state from disk, so our internal state
// machine can act accordingly.
state, err := c.log.CurrentState(tx)
if err != nil {
return nil, err
}
// Next we'll fetch our confirmed commitment set. This will only exist
// if the channel has been closed out on chain for modern nodes. For
// older nodes, this won't be found at all, and will rely on the
// existing written chain actions. Additionally, if this channel hasn't
// logged any actions in the log, then this field won't be present.
commitSet, err := c.log.FetchConfirmedCommitSet(tx)
if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
return nil, err
}
return &chanArbStartState{
currentState: state,
commitSet: commitSet,
}, nil
}
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
// If takes a start state, which will be looked up on disk if it is not
// provided.
func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
c.startTimestamp = c.cfg.Clock.Now()
// If the state passed in is nil, we look it up now.
if state == nil {
var err error
state, err = c.getStartState(nil)
if err != nil {
return err
}
}
log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v",
c.cfg.ChanPoint, newLogClosure(func() string {
return spew.Sdump(c.activeHTLCs)
}), state.currentState,
)
// Set our state from our starting state.
c.state = state.currentState
_, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
// If the channel has been marked pending close in the database, and we
// haven't transitioned the state machine to StateContractClosed (or a
// succeeding state), then a state transition most likely failed. We'll
// try to recover from this by manually advancing the state by setting
// the corresponding close trigger.
trigger := chainTrigger
triggerHeight := uint32(bestHeight)
if c.cfg.IsPendingClose {
switch c.state {
case StateDefault:
fallthrough
case StateBroadcastCommit:
fallthrough
case StateCommitmentBroadcasted:
switch c.cfg.CloseType {
case channeldb.CooperativeClose:
trigger = coopCloseTrigger
case channeldb.BreachClose:
trigger = breachCloseTrigger
case channeldb.LocalForceClose:
trigger = localCloseTrigger
case channeldb.RemoteForceClose:
trigger = remoteCloseTrigger
}
log.Warnf("ChannelArbitrator(%v): detected stalled "+
"state=%v for closed channel",
c.cfg.ChanPoint, c.state)
}
triggerHeight = c.cfg.ClosingHeight
}
log.Infof("ChannelArbitrator(%v): starting state=%v, trigger=%v, "+
"triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger,
triggerHeight)
// We'll now attempt to advance our state forward based on the current
// on-chain state, and our set of active contracts.
startingState := c.state
nextState, _, err := c.advanceState(
triggerHeight, trigger, state.commitSet,
)
if err != nil {
switch err {
// If we detect that we tried to fetch resolutions, but failed,
// this channel was marked closed in the database before
// resolutions successfully written. In this case there is not
// much we can do, so we don't return the error.
case errScopeBucketNoExist:
fallthrough
case errNoResolutions:
log.Warnf("ChannelArbitrator(%v): detected closed"+
"channel with no contract resolutions written.",
c.cfg.ChanPoint)
default:
return err
}
}
// If we start and ended at the awaiting full resolution state, then
// we'll relaunch our set of unresolved contracts.
if startingState == StateWaitingFullResolution &&
nextState == StateWaitingFullResolution {
// In order to relaunch the resolvers, we'll need to fetch the
// set of HTLCs that were present in the commitment transaction
// at the time it was confirmed. commitSet.ConfCommitKey can't
// be nil at this point since we're in
// StateWaitingFullResolution. We can only be in
// StateWaitingFullResolution after we've transitioned from
// StateContractClosed which can only be triggered by the local
// or remote close trigger. This trigger is only fired when we
// receive a chain event from the chain watcher than the
// commitment has been confirmed on chain, and before we
// advance our state step, we call InsertConfirmedCommitSet.
err := c.relaunchResolvers(state.commitSet, triggerHeight)
if err != nil {
return err
}
}
c.wg.Add(1)
go c.channelAttendant(bestHeight)
return nil
}
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
// order to provide them with information that's not immediately available upon
// starting the ChannelArbitrator. This information should ideally be stored in
// the database, so this only serves as a intermediate work-around to prevent a
// migration.
func (c *ChannelArbitrator) relaunchResolvers(commitSet *CommitSet,
heightHint uint32) error {
// We'll now query our log to see if there are any active unresolved
// contracts. If this is the case, then we'll relaunch all contract
// resolvers.
unresolvedContracts, err := c.log.FetchUnresolvedContracts()
if err != nil {
return err
}
// Retrieve the commitment tx hash from the log.
contractResolutions, err := c.log.FetchContractResolutions()
if err != nil {
log.Errorf("unable to fetch contract resolutions: %v",
err)
return err
}
commitHash := contractResolutions.CommitHash
// In prior versions of lnd, the information needed to supplement the
// resolvers (in most cases, the full amount of the HTLC) was found in
// the chain action map, which is now deprecated. As a result, if the
// commitSet is nil (an older node with unresolved HTLCs at time of
// upgrade), then we'll use the chain action information in place. The
// chain actions may exclude some information, but we cannot recover it
// for these older nodes at the moment.
var confirmedHTLCs []channeldb.HTLC
if commitSet != nil {
confirmedHTLCs = commitSet.HtlcSets[*commitSet.ConfCommitKey]
} else {
chainActions, err := c.log.FetchChainActions()
if err != nil {
log.Errorf("unable to fetch chain actions: %v", err)
return err
}
for _, htlcs := range chainActions {
confirmedHTLCs = append(confirmedHTLCs, htlcs...)
}
}
// Reconstruct the htlc outpoints and data from the chain action log.
// The purpose of the constructed htlc map is to supplement to
// resolvers restored from database with extra data. Ideally this data
// is stored as part of the resolver in the log. This is a workaround
// to prevent a db migration. We use all available htlc sets here in
// order to ensure we have complete coverage.
htlcMap := make(map[wire.OutPoint]*channeldb.HTLC)
for _, htlc := range confirmedHTLCs {
htlc := htlc
outpoint := wire.OutPoint{
Hash: commitHash,
Index: uint32(htlc.OutputIndex),
}
htlcMap[outpoint] = &htlc
}
// We'll also fetch the historical state of this channel, as it should
// have been marked as closed by now, and supplement it to each resolver
// such that we can properly resolve our pending contracts.
var chanState *channeldb.OpenChannel
chanState, err = c.cfg.FetchHistoricalChannel()
switch {
// If we don't find this channel, then it may be the case that it
// was closed before we started to retain the final state
// information for open channels.
case err == channeldb.ErrNoHistoricalBucket:
fallthrough
case err == channeldb.ErrChannelNotFound:
log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
"state", c.cfg.ChanPoint)
case err != nil:
return err
}
log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
"resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
for _, resolver := range unresolvedContracts {
if chanState != nil {
resolver.SupplementState(chanState)
}
htlcResolver, ok := resolver.(htlcContractResolver)
if !ok {
continue
}
htlcPoint := htlcResolver.HtlcPoint()
htlc, ok := htlcMap[htlcPoint]
if !ok {
return fmt.Errorf(
"htlc resolver %T unavailable", resolver,
)
}
htlcResolver.Supplement(*htlc)
}
// The anchor resolver is stateless and can always be re-instantiated.
if contractResolutions.AnchorResolution != nil {
anchorResolver := newAnchorResolver(
contractResolutions.AnchorResolution.AnchorSignDescriptor,
contractResolutions.AnchorResolution.CommitAnchor,
heightHint, c.cfg.ChanPoint,
ResolverConfig{
ChannelArbitratorConfig: c.cfg,
},
)
unresolvedContracts = append(unresolvedContracts, anchorResolver)
}
c.launchResolvers(unresolvedContracts)
return nil
}
// Report returns htlc reports for the active resolvers.
func (c *ChannelArbitrator) Report() []*ContractReport {
c.activeResolversLock.RLock()
defer c.activeResolversLock.RUnlock()
var reports []*ContractReport
for _, resolver := range c.activeResolvers {
r, ok := resolver.(reportingContractResolver)
if !ok {
continue
}
report := r.report()
if report == nil {
continue
}
reports = append(reports, report)
}
return reports
}
// Stop signals the ChannelArbitrator for a graceful shutdown.
func (c *ChannelArbitrator) Stop() error {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return nil
}
log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
if c.cfg.ChainEvents.Cancel != nil {
go c.cfg.ChainEvents.Cancel()
}
c.activeResolversLock.RLock()
for _, activeResolver := range c.activeResolvers {
activeResolver.Stop()
}
c.activeResolversLock.RUnlock()
close(c.quit)
c.wg.Wait()
return nil
}
// transitionTrigger is an enum that denotes exactly *why* a state transition
// was initiated. This is useful as depending on the initial trigger, we may
// skip certain states as those actions are expected to have already taken
// place as a result of the external trigger.
type transitionTrigger uint8
const (
// chainTrigger is a transition trigger that has been attempted due to
// changing on-chain conditions such as a block which times out HTLC's
// being attached.
chainTrigger transitionTrigger = iota
// userTrigger is a transition trigger driven by user action. Examples
// of such a trigger include a user requesting a force closure of the
// channel.
userTrigger
// remoteCloseTrigger is a transition trigger driven by the remote
// peer's commitment being confirmed.
remoteCloseTrigger
// localCloseTrigger is a transition trigger driven by our commitment
// being confirmed.
localCloseTrigger
// coopCloseTrigger is a transition trigger driven by a cooperative
// close transaction being confirmed.
coopCloseTrigger
// breachCloseTrigger is a transition trigger driven by a remote breach
// being confirmed. In this case the channel arbitrator will wait for
// the breacharbiter to finish and then clean up gracefully.
breachCloseTrigger
)
// String returns a human readable string describing the passed
// transitionTrigger.
func (t transitionTrigger) String() string {
switch t {
case chainTrigger:
return "chainTrigger"
case remoteCloseTrigger:
return "remoteCloseTrigger"
case userTrigger:
return "userTrigger"
case localCloseTrigger:
return "localCloseTrigger"
case coopCloseTrigger:
return "coopCloseTrigger"
case breachCloseTrigger:
return "breachCloseTrigger"
default:
return "unknown trigger"
}
}
// stateStep is a help method that examines our internal state, and attempts
// the appropriate state transition if necessary. The next state we transition
// to is returned, Additionally, if the next transition results in a commitment
// broadcast, the commitment transaction itself is returned.
func (c *ChannelArbitrator) stateStep(
triggerHeight uint32, trigger transitionTrigger,
confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
var (
nextState ArbitratorState
closeTx *wire.MsgTx
)
switch c.state {
// If we're in the default state, then we'll check our set of actions
// to see if while we were down, conditions have changed.
case StateDefault:
log.Debugf("ChannelArbitrator(%v): new block (height=%v) "+
"examining active HTLC's", c.cfg.ChanPoint,
triggerHeight)
// As a new block has been connected to the end of the main
// chain, we'll check to see if we need to make any on-chain
// claims on behalf of the channel contract that we're
// arbitrating for. If a commitment has confirmed, then we'll
// use the set snapshot from the chain, otherwise we'll use our
// current set.
var htlcs map[HtlcSetKey]htlcSet
if confCommitSet != nil {
htlcs = confCommitSet.toActiveHTLCSets()
} else {
// Update the set of activeHTLCs so
// checkLocalChainActions has an up-to-date view of the
// commitments.
c.updateActiveHTLCs()
htlcs = c.activeHTLCs
}
chainActions, err := c.checkLocalChainActions(
triggerHeight, trigger, htlcs, false,
)
if err != nil {
return StateDefault, nil, err
}
// If there are no actions to be made, then we'll remain in the
// default state. If this isn't a self initiated event (we're
// checking due to a chain update), then we'll exit now.
if len(chainActions) == 0 && trigger == chainTrigger {
log.Debugf("ChannelArbitrator(%v): no actions for "+
"chain trigger, terminating", c.cfg.ChanPoint)
return StateDefault, closeTx, nil
}
// Otherwise, we'll log that we checked the HTLC actions as the
// commitment transaction has already been broadcast.
log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(chainActions)
}))
// Depending on the type of trigger, we'll either "tunnel"
// through to a farther state, or just proceed linearly to the
// next state.
switch trigger {
// If this is a chain trigger, then we'll go straight to the
// next state, as we still need to broadcast the commitment
// transaction.
case chainTrigger:
fallthrough
case userTrigger:
nextState = StateBroadcastCommit
// If the trigger is a cooperative close being confirmed, then
// we can go straight to StateFullyResolved, as there won't be
// any contracts to resolve.
case coopCloseTrigger:
nextState = StateFullyResolved
// Otherwise, if this state advance was triggered by a
// commitment being confirmed on chain, then we'll jump
// straight to the state where the contract has already been
// closed, and we will inspect the set of unresolved contracts.
case localCloseTrigger:
log.Errorf("ChannelArbitrator(%v): unexpected local "+
"commitment confirmed while in StateDefault",
c.cfg.ChanPoint)
fallthrough
case remoteCloseTrigger:
nextState = StateContractClosed
case breachCloseTrigger:
nextContractState, err := c.checkLegacyBreach()
if nextContractState == StateError {
return nextContractState, nil, err
}
nextState = nextContractState
}
// If we're in this state, then we've decided to broadcast the
// commitment transaction. We enter this state either due to an outside
// sub-system, or because an on-chain action has been triggered.
case StateBroadcastCommit:
// Under normal operation, we can only enter
// StateBroadcastCommit via a user or chain trigger. On restart,
// this state may be reexecuted after closing the channel, but
// failing to commit to StateContractClosed or
// StateFullyResolved. In that case, one of the four close
// triggers will be presented, signifying that we should skip
// rebroadcasting, and go straight to resolving the on-chain
// contract or marking the channel resolved.
switch trigger {
case localCloseTrigger, remoteCloseTrigger:
log.Infof("ChannelArbitrator(%v): detected %s "+
"close after closing channel, fast-forwarding "+
"to %s to resolve contract",
c.cfg.ChanPoint, trigger, StateContractClosed)
return StateContractClosed, closeTx, nil
case breachCloseTrigger:
nextContractState, err := c.checkLegacyBreach()
if nextContractState == StateError {
log.Infof("ChannelArbitrator(%v): unable to "+
"advance breach close resolution: %v",
c.cfg.ChanPoint, nextContractState)
return StateError, closeTx, err
}
log.Infof("ChannelArbitrator(%v): detected %s close "+
"after closing channel, fast-forwarding to %s"+
" to resolve contract", c.cfg.ChanPoint,
trigger, nextContractState)
return nextContractState, closeTx, nil
case coopCloseTrigger:
log.Infof("ChannelArbitrator(%v): detected %s "+
"close after closing channel, fast-forwarding "+
"to %s to resolve contract",
c.cfg.ChanPoint, trigger, StateFullyResolved)
return StateFullyResolved, closeTx, nil
}
log.Infof("ChannelArbitrator(%v): force closing "+
"chan", c.cfg.ChanPoint)
// Now that we have all the actions decided for the set of
// HTLC's, we'll broadcast the commitment transaction, and
// signal the link to exit.
// We'll tell the switch that it should remove the link for
// this channel, in addition to fetching the force close
// summary needed to close this channel on chain.
closeSummary, err := c.cfg.Channel.ForceCloseChan()
if err != nil {
log.Errorf("ChannelArbitrator(%v): unable to "+
"force close: %v", c.cfg.ChanPoint, err)
return StateError, closeTx, err
}
closeTx = closeSummary.CloseTx
// Before publishing the transaction, we store it to the
// database, such that we can re-publish later in case it
// didn't propagate. We initiated the force close, so we
// mark broadcast with local initiator set to true.
err = c.cfg.MarkCommitmentBroadcasted(closeTx, true)
if err != nil {
log.Errorf("ChannelArbitrator(%v): unable to "+
"mark commitment broadcasted: %v",
c.cfg.ChanPoint, err)
return StateError, closeTx, err
}
// With the close transaction in hand, broadcast the
// transaction to the network, thereby entering the post
// channel resolution state.
log.Infof("Broadcasting force close transaction %v, "+
"ChannelPoint(%v): %v", closeTx.TxHash(),
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(closeTx)
}))
// At this point, we'll now broadcast the commitment
// transaction itself.
label := labels.MakeLabel(
labels.LabelTypeChannelClose, &c.cfg.ShortChanID,
)
if err := c.cfg.PublishTx(closeTx, label); err != nil {
log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
"close tx: %v", c.cfg.ChanPoint, err)
if err != lnwallet.ErrDoubleSpend {
return StateError, closeTx, err
}
}