Permalink
Browse files

peer+htlcswitch: randomize link commitment fee updates

In this commit, we modify the behavior of links updating their
commitment fees. Rather than attempting to update the commitment fee for
each link every time a new block comes in, we'll use a timer with a
random interval between 10 and 60 minutes for each link to determine
when to update their corresponding commitment fee. This prevents us from
oscillating the fee rate for our various commitment transactions.
  • Loading branch information...
wpaulino committed May 10, 2018
1 parent c1a1b3b commit 4cc60493d233b4dcb4bc1ffab85a56f135d061e5
Showing with 111 additions and 93 deletions.
  1. +42 −12 htlcswitch/link.go
  2. +20 −52 htlcswitch/link_test.go
  3. +38 −20 htlcswitch/test_utils.go
  4. +11 −9 peer.go
@@ -3,6 +3,7 @@ package htlcswitch
import (
"bytes"
"fmt"
prand "math/rand"
"sync"
"sync/atomic"
"time"
@@ -21,6 +22,10 @@ import (
"github.com/roasbeef/btcd/chaincfg/chainhash"
)
func init() {
prand.Seed(time.Now().UnixNano())
}
const (
// expiryGraceDelta is a grace period that the timeout of incoming
// HTLC's that pay directly to us (i.e we're the "exit node") must up
@@ -36,6 +41,12 @@ const (
// for a new fee update. We'll use this as a fee floor when proposing
// and accepting updates.
minCommitFeePerKw = 253
// DefaultMinLinkFeeUpdateTimeout and DefaultMaxLinkFeeUpdateTimeout
// represent the default timeout bounds in which a link should propose
// to update its commitment fee rate.
DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
)
// ForwardingPolicy describes the set of constraints that a given ChannelLink
@@ -248,6 +259,12 @@ type ChannelLinkConfig struct {
// in testing, it is here to ensure the sphinx replay detection on the
// receiving node is persistent.
UnsafeReplay bool
// MinFeeUpdateTimeout and MaxFeeUpdateTimeout represent the timeout
// interval bounds in which a link will propose to update its commitment
// fee rate. A random timeout will be selected between these values.
MinFeeUpdateTimeout time.Duration
MaxFeeUpdateTimeout time.Duration
}
// channelLink is the service which drives a channel's commitment update
@@ -342,6 +359,10 @@ type channelLink struct {
logCommitTimer *time.Timer
logCommitTick <-chan time.Time
// updateFeeTimer is the timer responsible for updating the link's
// commitment fee every time it fires.
updateFeeTimer *time.Timer
sync.RWMutex
wg sync.WaitGroup
@@ -427,6 +448,8 @@ func (l *channelLink) Start() error {
}
}
l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
l.wg.Add(1)
go l.htlcManager()
@@ -449,8 +472,8 @@ func (l *channelLink) Stop() {
l.cfg.ChainEvents.Cancel()
}
l.updateFeeTimer.Stop()
l.channel.Stop()
l.overflowQueue.Stop()
close(l.quit)
@@ -835,7 +858,6 @@ func (l *channelLink) htlcManager() {
out:
for {
// We must always check if we failed at some point processing
// the last update before processing the next.
if l.failed {
@@ -844,16 +866,10 @@ out:
}
select {
// A new block has arrived, we'll check the network fee to see
// if we should adjust our commitment fee, and also update our
// track of the best current height.
case blockEpoch, ok := <-l.cfg.BlockEpochs.Epochs:
if !ok {
break out
}
l.bestHeight = uint32(blockEpoch.Height)
// Our update fee timer has fired, so we'll check the network
// fee to see if we should adjust our commitment fee.
case <-l.updateFeeTimer.C:
l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
// If we're not the initiator of the channel, don't we
// don't control the fees, so we can ignore this.
@@ -983,6 +999,20 @@ out:
}
}
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
// within the link's configuration that will be used to determine when the link
// should propose an update to its commitment fee rate.
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
lower := int64(l.cfg.MinFeeUpdateTimeout)
upper := int64(l.cfg.MaxFeeUpdateTimeout)
rand := prand.Int63n(upper)
if rand < lower {
rand = lower
}
return time.Duration(rand)
}
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
@@ -1498,9 +1498,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
BlockEpochs: globalEpoch,
BatchTicker: ticker,
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
// Make the BatchSize large enough to not
// trigger commit update automatically during tests.
BatchSize: 10000,
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 30 * time.Minute,
}
const startingHeight = 100
@@ -3451,22 +3453,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
defer n.stop()
defer n.feeEstimator.Stop()
// First, we'll start off all channels at "height" 9000 by sending a
// new epoch to all the clients.
select {
case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9000,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
select {
case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9000,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
// For the sake of this test, we'll reset the timer to fire in a second
// so that Alice's link queries for a new network fee.
n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond)
startingFeeRate := channels.aliceToBob.CommitFeeRate()
@@ -3480,20 +3469,15 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
select {
case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte:
case <-time.After(time.Second * 5):
t.Fatalf("alice didn't query for the new " +
"network fee")
t.Fatalf("alice didn't query for the new network fee")
}
time.Sleep(time.Millisecond * 500)
time.Sleep(time.Second)
// The fee rate on the alice <-> bob channel should still be the same
// on both sides.
aliceFeeRate := channels.aliceToBob.CommitFeeRate()
bobFeeRate := channels.bobToAlice.CommitFeeRate()
if aliceFeeRate != bobFeeRate {
t.Fatalf("fee rates don't match: expected %v got %v",
aliceFeeRate, bobFeeRate)
}
if aliceFeeRate != startingFeeRate {
t.Fatalf("alice's fee rate shouldn't have changed: "+
"expected %v, got %v", aliceFeeRate, startingFeeRate)
@@ -3503,22 +3487,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
"expected %v, got %v", bobFeeRate, startingFeeRate)
}
// Now we'll send a new block update to all end points, with a new
// height THAT'S OVER 9000!!!
select {
case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9001,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
select {
case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9001,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
// We'll reset the timer once again to ensure Alice's link queries for a
// new network fee.
n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond)
// Next, we'll set up a deliver a fee rate that's triple the current
// fee rate. This should cause the Alice (the initiator) to trigger a
@@ -3527,11 +3498,10 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
select {
case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte * 3:
case <-time.After(time.Second * 5):
t.Fatalf("alice didn't query for the new " +
"network fee")
t.Fatalf("alice didn't query for the new network fee")
}
time.Sleep(time.Second * 2)
time.Sleep(time.Second)
// At this point, Alice should've triggered a new fee update that
// increased the fee rate to match the new rate.
@@ -3545,10 +3515,6 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
t.Fatalf("bob's fee rate didn't change: expected %v, got %v",
newFeeRate, aliceFeeRate)
}
if aliceFeeRate != bobFeeRate {
t.Fatalf("fee rates don't match: expected %v got %v",
aliceFeeRate, bobFeeRate)
}
}
// TestChannelLinkAcceptDuplicatePayment tests that if a link receives an
@@ -3917,9 +3883,11 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
BlockEpochs: globalEpoch,
BatchTicker: ticker,
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
// Make the BatchSize large enough to not
// trigger commit update automatically during tests.
BatchSize: 10000,
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 30 * time.Minute,
// Set any hodl flags requested for the new link.
HodlMask: hodl.MaskFromFlags(hodlFlags...),
DebugHTLC: len(hodlFlags) > 0,
@@ -882,6 +882,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
quit: make(chan struct{}),
}
const (
batchTimeout = 50 * time.Millisecond
fwdPkgTimeout = 5 * time.Second
feeUpdateTimeout = 30 * time.Minute
)
pCache := &mockPreimageCache{
// hash -> preimage
preimageMap: make(map[[32]byte][]byte),
@@ -921,11 +927,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{aliceTicker.C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C},
BatchSize: 10,
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchSize: 10,
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
},
aliceChannel,
startingHeight,
@@ -970,11 +979,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{firstBobTicker.C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C},
BatchSize: 10,
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchSize: 10,
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
},
firstBobChannel,
startingHeight,
@@ -1019,11 +1031,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{secondBobTicker.C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C},
BatchSize: 10,
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchSize: 10,
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
},
secondBobChannel,
startingHeight,
@@ -1068,11 +1083,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchTicker: &mockTicker{carolTicker.C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C},
BatchSize: 10,
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchSize: 10,
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
},
carolChannel,
startingHeight,
20 peer.go
@@ -1,6 +1,7 @@
package main
import (
"bytes"
"container/list"
"fmt"
"net"
@@ -9,14 +10,11 @@ import (
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/contractcourt"
"bytes"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
@@ -551,11 +549,15 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
time.NewTicker(50 * time.Millisecond)),
FwdPkgGCTicker: htlcswitch.NewBatchTicker(
time.NewTicker(time.Minute)),
BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay,
BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
}
link := htlcswitch.NewChannelLink(linkCfg, lnChan,
uint32(currentHeight))
link := htlcswitch.NewChannelLink(
linkCfg, lnChan, uint32(currentHeight),
)
// With the channel link created, we'll now notify the htlc switch so
// this channel can be used to dispatch local payments and also

0 comments on commit 4cc6049

Please sign in to comment.