-
Notifications
You must be signed in to change notification settings - Fork 2k
/
fee_bumper.go
1347 lines (1100 loc) · 40.3 KB
/
fee_bumper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package sweep
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
var (
// ErrInvalidBumpResult is returned when the bump result is invalid.
ErrInvalidBumpResult = errors.New("invalid bump result")
// ErrNotEnoughBudget is returned when the fee bumper decides the
// current budget cannot cover the fee.
ErrNotEnoughBudget = errors.New("not enough budget")
// ErrLocktimeImmature is returned when sweeping an input whose
// locktime is not reached.
ErrLocktimeImmature = errors.New("immature input")
// ErrTxNoOutput is returned when an output cannot be created during tx
// preparation, usually due to the output being dust.
ErrTxNoOutput = errors.New("tx has no output")
// ErrThirdPartySpent is returned when a third party has spent the
// input in the sweeping tx.
ErrThirdPartySpent = errors.New("third party spent the output")
)
// Bumper defines an interface that can be used by other subsystems for fee
// bumping.
type Bumper interface {
// Broadcast is used to publish the tx created from the given inputs
// specified in the request. It handles the tx creation, broadcasts it,
// and monitors its confirmation status for potential fee bumping. It
// returns a chan that the caller can use to receive updates about the
// broadcast result and potential RBF attempts.
Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
}
// BumpEvent represents the event of a fee bumping attempt.
type BumpEvent uint8
const (
// TxPublished is sent when the broadcast attempt is finished.
TxPublished BumpEvent = iota
// TxFailed is sent when the broadcast attempt fails.
TxFailed
// TxReplaced is sent when the original tx is replaced by a new one.
TxReplaced
// TxConfirmed is sent when the tx is confirmed.
TxConfirmed
// sentinalEvent is used to check if an event is unknown.
sentinalEvent
)
// String returns a human-readable string for the event.
func (e BumpEvent) String() string {
switch e {
case TxPublished:
return "Published"
case TxFailed:
return "Failed"
case TxReplaced:
return "Replaced"
case TxConfirmed:
return "Confirmed"
default:
return "Unknown"
}
}
// Unknown returns true if the event is unknown.
func (e BumpEvent) Unknown() bool {
return e >= sentinalEvent
}
// BumpRequest is used by the caller to give the Bumper the necessary info to
// create and manage potential fee bumps for a set of inputs.
type BumpRequest struct {
// Budget givens the total amount that can be used as fees by these
// inputs.
Budget btcutil.Amount
// Inputs is the set of inputs to sweep.
Inputs []input.Input
// DeadlineHeight is the block height at which the tx should be
// confirmed.
DeadlineHeight int32
// DeliveryAddress is the script to send the change output to.
DeliveryAddress []byte
// MaxFeeRate is the maximum fee rate that can be used for fee bumping.
MaxFeeRate chainfee.SatPerKWeight
// StartingFeeRate is an optional parameter that can be used to specify
// the initial fee rate to use for the fee function.
StartingFeeRate fn.Option[chainfee.SatPerKWeight]
}
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
// request. It calculates the feerate using the supplied budget and the weight,
// compares it with the specified MaxFeeRate, and returns the smaller of the
// two.
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
// Get the size of the sweep tx, which will be used to calculate the
// budget fee rate.
size, err := calcSweepTxWeight(r.Inputs, r.DeliveryAddress)
if err != nil {
return 0, err
}
// Use the budget and MaxFeeRate to decide the max allowed fee rate.
// This is needed as, when the input has a large value and the user
// sets the budget to be proportional to the input value, the fee rate
// can be very high and we need to make sure it doesn't exceed the max
// fee rate.
maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
if maxFeeRateAllowed > r.MaxFeeRate {
log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
"MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
r.MaxFeeRate, size)
return r.MaxFeeRate, nil
}
log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
"instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
return maxFeeRateAllowed, nil
}
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
// sweeping tx always has a single output(change).
func calcSweepTxWeight(inputs []input.Input,
outputPkScript []byte) (uint64, error) {
// Use a const fee rate as we only use the weight estimator to
// calculate the size.
const feeRate = 1
// Initialize the tx weight estimator with,
// - nil outputs as we only have one single change output.
// - const fee rate as we don't care about the fees here.
// - 0 maxfeerate as we don't care about fees here.
//
// TODO(yy): we should refactor the weight estimator to not require a
// fee rate and max fee rate and make it a pure tx weight calculator.
_, estimator, err := getWeightEstimate(
inputs, nil, feeRate, 0, outputPkScript,
)
if err != nil {
return 0, err
}
return uint64(estimator.weight()), nil
}
// BumpResult is used by the Bumper to send updates about the tx being
// broadcast.
type BumpResult struct {
// Event is the type of event that the result is for.
Event BumpEvent
// Tx is the tx being broadcast.
Tx *wire.MsgTx
// ReplacedTx is the old, replaced tx if a fee bump is attempted.
ReplacedTx *wire.MsgTx
// FeeRate is the fee rate used for the new tx.
FeeRate chainfee.SatPerKWeight
// Fee is the fee paid by the new tx.
Fee btcutil.Amount
// Err is the error that occurred during the broadcast.
Err error
// requestID is the ID of the request that created this record.
requestID uint64
}
// Validate validates the BumpResult so it's safe to use.
func (b *BumpResult) Validate() error {
// Every result must have a tx.
if b.Tx == nil {
return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
}
// Every result must have a known event.
if b.Event.Unknown() {
return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
}
// If it's a replacing event, it must have a replaced tx.
if b.Event == TxReplaced && b.ReplacedTx == nil {
return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
}
// If it's a failed event, it must have an error.
if b.Event == TxFailed && b.Err == nil {
return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
}
// If it's a confirmed event, it must have a fee rate and fee.
if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
return fmt.Errorf("%w: missing fee rate or fee",
ErrInvalidBumpResult)
}
return nil
}
// TxPublisherConfig is the config used to create a new TxPublisher.
type TxPublisherConfig struct {
// Signer is used to create the tx signature.
Signer input.Signer
// Wallet is used primarily to publish the tx.
Wallet Wallet
// Estimator is used to estimate the fee rate for the new tx based on
// its deadline conf target.
Estimator chainfee.Estimator
// Notifier is used to monitor the confirmation status of the tx.
Notifier chainntnfs.ChainNotifier
}
// TxPublisher is an implementation of the Bumper interface. It utilizes the
// `testmempoolaccept` RPC to bump the fee of txns it created based on
// different fee function selected or configed by the caller. Its purpose is to
// take a list of inputs specified, and create a tx that spends them to a
// specified output. It will then monitor the confirmation status of the tx,
// and if it's not confirmed within a certain time frame, it will attempt to
// bump the fee of the tx by creating a new tx that spends the same inputs to
// the same output, but with a higher fee rate. It will continue to do this
// until the tx is confirmed or the fee rate reaches the maximum fee rate
// specified by the caller.
type TxPublisher struct {
wg sync.WaitGroup
// cfg specifies the configuration of the TxPublisher.
cfg *TxPublisherConfig
// currentHeight is the current block height.
currentHeight atomic.Int32
// records is a map keyed by the requestCounter and the value is the tx
// being monitored.
records lnutils.SyncMap[uint64, *monitorRecord]
// requestCounter is a monotonically increasing counter used to keep
// track of how many requests have been made.
requestCounter atomic.Uint64
// subscriberChans is a map keyed by the requestCounter, each item is
// the chan that the publisher sends the fee bump result to.
subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
// quit is used to signal the publisher to stop.
quit chan struct{}
}
// Compile-time constraint to ensure TxPublisher implements Bumper.
var _ Bumper = (*TxPublisher)(nil)
// NewTxPublisher creates a new TxPublisher.
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
return &TxPublisher{
cfg: &cfg,
records: lnutils.SyncMap[uint64, *monitorRecord]{},
subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
quit: make(chan struct{}),
}
}
// isNeutrinoBackend checks if the wallet backend is neutrino.
func (t *TxPublisher) isNeutrinoBackend() bool {
return t.cfg.Wallet.BackEnd() == "neutrino"
}
// Broadcast is used to publish the tx created from the given inputs. It will,
// 1. init a fee function based on the given strategy.
// 2. create an RBF-compliant tx and monitor it for confirmation.
// 3. notify the initial broadcast result back to the caller.
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
// specified cannot cover the fee.
//
// NOTE: part of the Bumper interface.
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
log.Tracef("Received broadcast request: %s", newLogClosure(
func() string {
return spew.Sdump(req)
})())
// Attempt an initial broadcast which is guaranteed to comply with the
// RBF rules.
result, err := t.initialBroadcast(req)
if err != nil {
log.Errorf("Initial broadcast failed: %v", err)
return nil, err
}
// Create a chan to send the result to the caller.
subscriber := make(chan *BumpResult, 1)
t.subscriberChans.Store(result.requestID, subscriber)
// Send the initial broadcast result to the caller.
t.handleResult(result)
return subscriber, nil
}
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
// broadcasts it.
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
// Create a fee bumping algorithm to be used for future RBF.
feeAlgo, err := t.initializeFeeFunction(req)
if err != nil {
return nil, fmt.Errorf("init fee function: %w", err)
}
// Create the initial tx to be broadcasted. This tx is guaranteed to
// comply with the RBF restrictions.
requestID, err := t.createRBFCompliantTx(req, feeAlgo)
if err != nil {
return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
}
// Broadcast the tx and return the monitored record.
result, err := t.broadcast(requestID)
if err != nil {
return nil, fmt.Errorf("broadcast sweep tx: %w", err)
}
return result, nil
}
// initializeFeeFunction initializes a fee function to be used for this request
// for future fee bumping.
func (t *TxPublisher) initializeFeeFunction(
req *BumpRequest) (FeeFunction, error) {
// Get the max allowed feerate.
maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
if err != nil {
return nil, err
}
// Get the initial conf target.
confTarget := calcCurrentConfTarget(
t.currentHeight.Load(), req.DeadlineHeight,
)
log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
"maxFeeRateAllowed=%v", confTarget, req.Budget,
maxFeeRateAllowed)
// Initialize the fee function and return it.
//
// TODO(yy): return based on differet req.Strategy?
return NewLinearFeeFunction(
maxFeeRateAllowed, confTarget, t.cfg.Estimator,
req.StartingFeeRate,
)
}
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
// and redo the process until the tx is valid, or return an error when non-RBF
// related errors occur or the budget has been used up.
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
f FeeFunction) (uint64, error) {
for {
// Create a new tx with the given fee rate and check its
// mempool acceptance.
tx, fee, err := t.createAndCheckTx(req, f)
switch {
case err == nil:
// The tx is valid, return the request ID.
requestID := t.storeRecord(tx, req, f, fee)
log.Infof("Created tx %v for %v inputs: feerate=%v, "+
"fee=%v, inputs=%v", tx.TxHash(),
len(req.Inputs), f.FeeRate(), fee,
inputTypeSummary(req.Inputs))
return requestID, nil
// If the error indicates the fees paid is not enough, we will
// ask the fee function to increase the fee rate and retry.
case errors.Is(err, lnwallet.ErrMempoolFee):
// We should at least start with a feerate above the
// mempool min feerate, so if we get this error, it
// means something is wrong earlier in the pipeline.
log.Errorf("Current fee=%v, feerate=%v, %v", fee,
f.FeeRate(), err)
fallthrough
// We are not paying enough fees so we increase it.
case errors.Is(err, rpcclient.ErrInsufficientFee):
increased := false
// Keep calling the fee function until the fee rate is
// increased or maxed out.
for !increased {
log.Debugf("Increasing fee for next round, "+
"current fee=%v, feerate=%v", fee,
f.FeeRate())
// If the fee function tells us that we have
// used up the budget, we will return an error
// indicating this tx cannot be made. The
// sweeper should handle this error and try to
// cluster these inputs differetly.
increased, err = f.Increment()
if err != nil {
return 0, err
}
}
// TODO(yy): suppose there's only one bad input, we can do a
// binary search to find out which input is causing this error
// by recreating a tx using half of the inputs and check its
// mempool acceptance.
default:
log.Debugf("Failed to create RBF-compliant tx: %v", err)
return 0, err
}
}
}
// storeRecord stores the given record in the records map.
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
f FeeFunction, fee btcutil.Amount) uint64 {
// Increase the request counter.
//
// NOTE: this is the only place where we increase the
// counter.
requestID := t.requestCounter.Add(1)
// Register the record.
t.records.Store(requestID, &monitorRecord{
tx: tx,
req: req,
feeFunction: f,
fee: fee,
})
return requestID
}
// createAndCheckTx creates a tx based on the given inputs, change output
// script, and the fee rate. In addition, it validates the tx's mempool
// acceptance before returning a tx that can be published directly, along with
// its fee.
func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
*wire.MsgTx, btcutil.Amount, error) {
// Create the sweep tx with max fee rate of 0 as the fee function
// guarantees the fee rate used here won't exceed the max fee rate.
tx, fee, err := t.createSweepTx(
req.Inputs, req.DeliveryAddress, f.FeeRate(),
)
if err != nil {
return nil, fee, fmt.Errorf("create sweep tx: %w", err)
}
// Sanity check the budget still covers the fee.
if fee > req.Budget {
return nil, fee, fmt.Errorf("%w: budget=%v, fee=%v",
ErrNotEnoughBudget, req.Budget, fee)
}
// Validate the tx's mempool acceptance.
err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
// Exit early if the tx is valid.
if err == nil {
return tx, fee, nil
}
// Print an error log if the chain backend doesn't support the mempool
// acceptance test RPC.
if errors.Is(err, rpcclient.ErrBackendVersion) {
log.Errorf("TestMempoolAccept not supported by backend, " +
"consider upgrading it to a newer version")
return tx, fee, nil
}
// We are running on a backend that doesn't implement the RPC
// testmempoolaccept, eg, neutrino, so we'll skip the check.
if errors.Is(err, chain.ErrUnimplemented) {
log.Debug("Skipped testmempoolaccept due to not implemented")
return tx, fee, nil
}
return nil, fee, fmt.Errorf("tx=%v failed mempool check: %w",
tx.TxHash(), err)
}
// broadcast takes a monitored tx and publishes it to the network. Prior to the
// broadcast, it will subscribe the tx's confirmation notification and attach
// the event channel to the record. Any broadcast-related errors will not be
// returned here, instead, they will be put inside the `BumpResult` and
// returned to the caller.
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
// Get the record being monitored.
record, ok := t.records.Load(requestID)
if !ok {
return nil, fmt.Errorf("tx record %v not found", requestID)
}
txid := record.tx.TxHash()
tx := record.tx
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
txid, len(tx.TxIn), t.currentHeight.Load())
// Set the event, and change it to TxFailed if the wallet fails to
// publish it.
event := TxPublished
// Publish the sweeping tx with customized label. If the publish fails,
// this error will be saved in the `BumpResult` and it will be removed
// from being monitored.
err := t.cfg.Wallet.PublishTransaction(
tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
)
if err != nil {
// NOTE: we decide to attach this error to the result instead
// of returning it here because by the time the tx reaches
// here, it should have passed the mempool acceptance check. If
// it still fails to be broadcast, it's likely a non-RBF
// related error happened. So we send this error back to the
// caller so that it can handle it properly.
//
// TODO(yy): find out which input is causing the failure.
log.Errorf("Failed to publish tx %v: %v", txid, err)
event = TxFailed
}
result := &BumpResult{
Event: event,
Tx: record.tx,
Fee: record.fee,
FeeRate: record.feeFunction.FeeRate(),
Err: err,
requestID: requestID,
}
return result, nil
}
// notifyResult sends the result to the resultChan specified by the requestID.
// This channel is expected to be read by the caller.
func (t *TxPublisher) notifyResult(result *BumpResult) {
id := result.requestID
subscriber, ok := t.subscriberChans.Load(id)
if !ok {
log.Errorf("Result chan for id=%v not found", id)
return
}
log.Debugf("Sending result for requestID=%v, tx=%v", id,
result.Tx.TxHash())
select {
// Send the result to the subscriber.
//
// TODO(yy): Add timeout in case it's blocking?
case subscriber <- result:
case <-t.quit:
log.Debug("Fee bumper stopped")
}
}
// removeResult removes the tracking of the result if the result contains a
// non-nil error, or the tx is confirmed, the record will be removed from the
// maps.
func (t *TxPublisher) removeResult(result *BumpResult) {
id := result.requestID
// Remove the record from the maps if there's an error. This means this
// tx has failed its broadcast and cannot be retried. There are two
// cases,
// - when the budget cannot cover the fee.
// - when a non-RBF related error occurs.
switch result.Event {
case TxFailed:
log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
id, result.Tx.TxHash(), result.Err)
case TxConfirmed:
// Remove the record is the tx is confirmed.
log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
result.Tx.TxHash())
// Do nothing if it's neither failed or confirmed.
default:
log.Tracef("Skipping record removal for id=%v, event=%v", id,
result.Event)
return
}
t.records.Delete(id)
t.subscriberChans.Delete(id)
}
// handleResult handles the result of a tx broadcast. It will notify the
// subscriber and remove the record if the tx is confirmed or failed to be
// broadcast.
func (t *TxPublisher) handleResult(result *BumpResult) {
// Notify the subscriber.
t.notifyResult(result)
// Remove the record if it's failed or confirmed.
t.removeResult(result)
}
// monitorRecord is used to keep track of the tx being monitored by the
// publisher internally.
type monitorRecord struct {
// tx is the tx being monitored.
tx *wire.MsgTx
// req is the original request.
req *BumpRequest
// feeFunction is the fee bumping algorithm used by the publisher.
feeFunction FeeFunction
// fee is the fee paid by the tx.
fee btcutil.Amount
}
// Start starts the publisher by subscribing to block epoch updates and kicking
// off the monitor loop.
func (t *TxPublisher) Start() error {
log.Info("TxPublisher starting...")
defer log.Debugf("TxPublisher started")
blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}
t.wg.Add(1)
go t.monitor(blockEvent)
return nil
}
// Stop stops the publisher and waits for the monitor loop to exit.
func (t *TxPublisher) Stop() {
log.Info("TxPublisher stopping...")
defer log.Debugf("TxPublisher stopped")
close(t.quit)
t.wg.Wait()
}
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
// it will examine all the txns being monitored, and check if any of them needs
// to be bumped. If so, it will attempt to bump the fee of the tx.
//
// NOTE: Must be run as a goroutine.
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
defer blockEvent.Cancel()
defer t.wg.Done()
for {
select {
case epoch, ok := <-blockEvent.Epochs:
if !ok {
// We should stop the publisher before stopping
// the chain service. Otherwise it indicates an
// error.
log.Error("Block epoch channel closed, exit " +
"monitor")
return
}
log.Debugf("TxPublisher received new block: %v",
epoch.Height)
// Update the best known height for the publisher.
t.currentHeight.Store(epoch.Height)
// Check all monitored txns to see if any of them needs
// to be bumped.
t.processRecords()
case <-t.quit:
log.Debug("Fee bumper stopped, exit monitor")
return
}
}
}
// processRecords checks all the txns being monitored, and checks if any of
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
func (t *TxPublisher) processRecords() {
// confirmedRecords stores a map of the records which have been
// confirmed.
confirmedRecords := make(map[uint64]*monitorRecord)
// feeBumpRecords stores a map of the records which need to be bumped.
feeBumpRecords := make(map[uint64]*monitorRecord)
// failedRecords stores a map of the records which has inputs being
// spent by a third party.
//
// NOTE: this is only used for neutrino backend.
failedRecords := make(map[uint64]*monitorRecord)
// visitor is a helper closure that visits each record and divides them
// into two groups.
visitor := func(requestID uint64, r *monitorRecord) error {
log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
r.tx.TxHash())
// If the tx is already confirmed, we can stop monitoring it.
if t.isConfirmed(r.tx.TxHash()) {
confirmedRecords[requestID] = r
// Move to the next record.
return nil
}
// Check whether the inputs has been spent by a third party.
//
// NOTE: this check is only done for neutrino backend.
if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
failedRecords[requestID] = r
// Move to the next record.
return nil
}
feeBumpRecords[requestID] = r
// Return nil to move to the next record.
return nil
}
// Iterate through all the records and divide them into two groups.
t.records.ForEach(visitor)
// For records that are confirmed, we'll notify the caller about this
// result.
for requestID, r := range confirmedRecords {
rec := r
log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
t.wg.Add(1)
go t.handleTxConfirmed(rec, requestID)
}
// Get the current height to be used in the following goroutines.
currentHeight := t.currentHeight.Load()
// For records that are not confirmed, we perform a fee bump if needed.
for requestID, r := range feeBumpRecords {
rec := r
log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
t.wg.Add(1)
go t.handleFeeBumpTx(requestID, rec, currentHeight)
}
// For records that are failed, we'll notify the caller about this
// result.
for requestID, r := range failedRecords {
rec := r
log.Debugf("Tx=%v has inputs been spent by a third party, "+
"failing it now", r.tx.TxHash())
t.wg.Add(1)
go t.handleThirdPartySpent(rec, requestID)
}
}
// handleTxConfirmed is called when a monitored tx is confirmed. It will
// notify the subscriber then remove the record from the maps .
//
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
defer t.wg.Done()
// Create a result that will be sent to the resultChan which is
// listened by the caller.
result := &BumpResult{
Event: TxConfirmed,
Tx: r.tx,
requestID: requestID,
Fee: r.fee,
FeeRate: r.feeFunction.FeeRate(),
}
// Notify that this tx is confirmed and remove the record from the map.
t.handleResult(result)
}
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
// attempt to bump the fee of the tx.
//
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
currentHeight int32) {
defer t.wg.Done()
oldTxid := r.tx.TxHash()
// Get the current conf target for this record.
confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
// Ask the fee function whether a bump is needed. We expect the fee
// function to increase its returned fee rate after calling this
// method.
increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
if err != nil {
// TODO(yy): send this error back to the sweeper so it can
// re-group the inputs?
log.Errorf("Failed to increase fee rate for tx %v at "+
"height=%v: %v", oldTxid, t.currentHeight.Load(), err)
return
}
// If the fee rate was not increased, there's no need to bump the fee.
if !increased {
log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
t.currentHeight.Load())
return
}
// The fee function now has a new fee rate, we will use it to bump the
// fee of the tx.
resultOpt := t.createAndPublishTx(requestID, r)
// If there's a result, we will notify the caller about the result.
resultOpt.WhenSome(func(result BumpResult) {
// Notify the new result.
t.handleResult(&result)
})
}
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
// spent. It will notify the subscriber then remove the record from the maps
// and send a TxFailed event to the subscriber.
//
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
requestID uint64) {
defer t.wg.Done()
// Create a result that will be sent to the resultChan which is
// listened by the caller.
//
// TODO(yy): create a new state `TxThirdPartySpent` to notify the
// sweeper to remove the input, hence moving the monitoring of inputs
// spent inside the fee bumper.
result := &BumpResult{
Event: TxFailed,
Tx: r.tx,
requestID: requestID,
Err: ErrThirdPartySpent,
}
// Notify that this tx is confirmed and remove the record from the map.
t.handleResult(result)
}
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
// to the network. It will update the record with the new tx and fee rate if
// successfully created, and return the result when published successfully.
func (t *TxPublisher) createAndPublishTx(requestID uint64,
r *monitorRecord) fn.Option[BumpResult] {
// Fetch the old tx.
oldTx := r.tx
// Create a new tx with the new fee rate.
//
// NOTE: The fee function is expected to have increased its returned
// fee rate after calling the SkipFeeBump method. So we can use it
// directly here.
tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
// If the error is fee related, we will return no error and let the fee
// bumper retry it at next block.
//
// NOTE: we can check the RBF error here and ask the fee function to
// recalculate the fee rate. However, this would defeat the purpose of
// using a deadline based fee function:
// - if the deadline is far away, there's no rush to RBF the tx.
// - if the deadline is close, we expect the fee function to give us a
// higher fee rate. If the fee rate cannot satisfy the RBF rules, it
// means the budget is not enough.
if errors.Is(err, rpcclient.ErrInsufficientFee) ||
errors.Is(err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return fn.None[BumpResult]()
}
// If the error is not fee related, we will return a `TxFailed` event
// so this input can be retried.
if err != nil {
// If the tx doesn't not have enought budget, we will return a
// result so the sweeper can handle it by re-clustering the
// utxos.
if errors.Is(err, ErrNotEnoughBudget) {
log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
err)
} else {
// Otherwise, an unexpected error occurred, we will
// fail the tx and let the sweeper retry the whole
// process.
log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
err)
}
return fn.Some(BumpResult{
Event: TxFailed,
Tx: oldTx,
Err: err,
requestID: requestID,
})
}
// The tx has been created without any errors, we now register a new
// record by overwriting the same requestID.
t.records.Store(requestID, &monitorRecord{
tx: tx,
req: r.req,
feeFunction: r.feeFunction,
fee: fee,
})
// Attempt to broadcast this new tx.
result, err := t.broadcast(requestID)
if err != nil {
log.Infof("Failed to broadcast replacement tx %v: %v",
tx.TxHash(), err)
return fn.None[BumpResult]()
}
// If the result error is fee related, we will return no error and let
// the fee bumper retry it at next block.
//
// NOTE: we may get this error if we've bypassed the mempool check,
// which means we are suing neutrino backend.
if errors.Is(result.Err, rpcclient.ErrInsufficientFee) ||
errors.Is(result.Err, lnwallet.ErrMempoolFee) {
log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
return fn.None[BumpResult]()
}
// A successful replacement tx is created, attach the old tx.
result.ReplacedTx = oldTx