Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: revamp premature update map #5902

Merged
merged 3 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
182 changes: 136 additions & 46 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/neutrino/cache"
"github.com/lightninglabs/neutrino/cache/lru"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
Expand All @@ -38,6 +40,15 @@ const (
// determine how often we should allow a new update for a specific
// channel and direction.
DefaultChannelUpdateInterval = time.Minute

// maxPrematureUpdates tracks the max amount of premature channel
// updates that we'll hold onto.
maxPrematureUpdates = 100

// maxRejectedUpdates tracks the max amount of rejected channel updates
// we'll maintain. This is the global size across all peers. We'll
// allocate ~3 MB max to the cache.
maxRejectedUpdates = 10_000
)

var (
Expand Down Expand Up @@ -268,6 +279,46 @@ type Config struct {
ChannelUpdateInterval time.Duration
}

// cachedNetworkMsg is a wrapper around a network message that can be used with
// *lru.Cache.
type cachedNetworkMsg struct {
msgs []*networkMsg
}

// Size returns the "size" of an entry. We return the number of items as we
// just want to limit the total amount of entires rather than do accurate size
// accounting.
func (c *cachedNetworkMsg) Size() (uint64, error) {
return uint64(len(c.msgs)), nil
}

// rejectCacheKey is the cache key that we'll use to track announcements we've
// recently rejected.
type rejectCacheKey struct {
pubkey [33]byte
chanID uint64
}

// newRejectCacheKey returns a new cache key for the reject cache.
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
k := rejectCacheKey{
chanID: cid,
pubkey: pub,
}

return k
}

// cachedReject is the empty value used to track the value for rejects.
type cachedReject struct {
}

// Size returns the "size" of an entry. We return 1 as we just want to limit
// the total size.
func (c *cachedReject) Size() (uint64, error) {
return 1, nil
}

// AuthenticatedGossiper is a subsystem which is responsible for receiving
// announcements, validating them and applying the changes to router, syncing
// lightning network with newly connected nodes, broadcasting announcements
Expand Down Expand Up @@ -302,8 +353,7 @@ type AuthenticatedGossiper struct {
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
prematureChannelUpdates *lru.Cache

// networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the
Expand All @@ -327,8 +377,7 @@ type AuthenticatedGossiper struct {
// consistent between when the DB is first read until it's written.
channelMtx *multimutex.Mutex

rejectMtx sync.RWMutex
recentRejects map[uint64]struct{}
recentRejects *lru.Cache

// syncMgr is a subsystem responsible for managing the gossip syncers
// for peers currently connected. When a new peer is connected, the
Expand Down Expand Up @@ -368,9 +417,9 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
prematureChannelUpdates: lru.NewCache(maxPrematureUpdates),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
recentRejects: lru.NewCache(maxRejectedUpdates),
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
}

Expand Down Expand Up @@ -1018,7 +1067,9 @@ func (d *AuthenticatedGossiper) networkHandler() {

// If this message was recently rejected, then we won't
// attempt to re-process it.
if d.isRecentlyRejectedMsg(announcement.msg) {
if announcement.isRemote && d.isRecentlyRejectedMsg(
announcement.msg, announcement.peer.PubKey(),
) {
announcement.err <- fmt.Errorf("recently " +
"rejected")
continue
Expand Down Expand Up @@ -1184,22 +1235,23 @@ func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {

// isRecentlyRejectedMsg returns true if we recently rejected a message, and
// false otherwise, This avoids expensive reprocessing of the message.
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
d.rejectMtx.RLock()
defer d.rejectMtx.RUnlock()
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
peerPub [33]byte) bool {

var scid uint64
switch m := msg.(type) {
case *lnwire.ChannelUpdate:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
return ok
scid = m.ShortChannelID.ToUint64()

case *lnwire.ChannelAnnouncement:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
return ok
scid = m.ShortChannelID.ToUint64()

default:
return false
}

_, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
return err != cache.ErrElementNotFound
}

// retransmitStaleAnns examines all outgoing channels that the source node is
Expand Down Expand Up @@ -1622,9 +1674,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.cfg.ChainHash)
log.Errorf(err.Error())

d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

nMsg.err <- err
return nil, false
Expand Down Expand Up @@ -1662,9 +1716,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
if err := routing.ValidateChannelAnn(msg); err != nil {
err := fmt.Errorf("unable to validate "+
"announcement: %v", err)
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()

key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

log.Error(err)
nMsg.err <- err
Expand Down Expand Up @@ -1735,9 +1792,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// see if we get any new announcements.
anns, rErr := d.processRejectedEdge(msg, proof)
if rErr != nil {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()

key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

nMsg.err <- rErr
return nil, false
}
Expand All @@ -1759,9 +1820,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Tracef("Router rejected channel "+
"edge: %v", err)

d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
}

nMsg.err <- err
Expand All @@ -1774,13 +1837,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg

d.pChanUpdMtx.Lock()
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...)

// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID)
if err == nil {
// There was actually an entry in the map, so we'll
// accumulate it. We don't worry about deletion, since
// it'll eventually fall out anyway.
chanMsgs := earlyChanUpdates.(*cachedNetworkMsg)
channelUpdates = append(channelUpdates, chanMsgs.msgs...)
}

// Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one
Expand Down Expand Up @@ -1843,9 +1907,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
d.cfg.ChainHash)
log.Errorf(err.Error())

d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

nMsg.err <- err
return nil, false
Expand Down Expand Up @@ -1929,11 +1995,28 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], nMsg,
earlyMsgs, err := d.prematureChannelUpdates.Get(
shortChanID,
)
d.pChanUpdMtx.Unlock()
switch {
// Nothing in the cache yet, we can just directly
// insert this element.
case err == cache.ErrElementNotFound:
_, _ = d.prematureChannelUpdates.Put(
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
shortChanID, &cachedNetworkMsg{
msgs: []*networkMsg{nMsg},
})

// There's already something in the cache, so we'll
// combine the set of messages into a single value.
default:
msgs := earlyMsgs.(*cachedNetworkMsg).msgs
msgs = append(msgs, nMsg)
_, _ = d.prematureChannelUpdates.Put(
shortChanID, &cachedNetworkMsg{
msgs: msgs,
})
}

log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v), saving for "+
Expand All @@ -1950,9 +2033,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Error(err)
nMsg.err <- err

d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

return nil, false
}

Expand Down Expand Up @@ -2055,9 +2141,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
routing.ErrIgnored) {
log.Debug(err)
} else if err != routing.ErrVBarrierShuttingDown {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()

key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

log.Error(err)
}

Expand Down
2 changes: 2 additions & 0 deletions docs/release-notes/release-notes-0.14.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ messages directly. There is no routing/path finding involved.

* Using `go get` to install go executables is now deprecated. Migrate to `go install` our lnrpc proto dockerfile [Migrate `go get` to `go install`](https://github.com/lightningnetwork/lnd/pull/5879)

* [The premature update map has been revamped using an LRU cache](https://github.com/lightningnetwork/lnd/pull/5902)

## Code Health

### Code cleanup, refactor, typo fixes
Expand Down