Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: flag flip no historical gossip #3359

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.go
Expand Up @@ -308,6 +308,8 @@ type config struct {
NumGraphSyncPeers int `long:"numgraphsyncpeers" description:"The number of peers that we should receive new graph updates from. This option can be tuned to save bandwidth for light clients or routing nodes."`
HistoricalSyncInterval time.Duration `long:"historicalsyncinterval" description:"The polling interval between historical graph sync attempts. Each historical graph sync attempt ensures we reconcile with the remote peer's graph from the genesis block."`

IgnoreHistoricalGossipFilters bool `long:"ignore-historical-gossip-filters" description:"If true, will not reply with historical data that matches the range specified by a remote peer's gossip_timestamp_filter. Doing so will result in lower memory and bandwidth requirements."`

RejectPush bool `long:"rejectpush" description:"If true, lnd will not accept channel opening requests with non-zero push amounts. This should prevent accidental pushes to merchant nodes."`

StaggerInitialReconnect bool `long:"stagger-initial-reconnect" description:"If true, will apply a randomized staggering between 0s and 30s when reconnecting to persistent peers on startup. The first 10 reconnections will be attempted instantly, regardless of the flag's value"`
Expand Down
17 changes: 12 additions & 5 deletions discovery/gossiper.go
Expand Up @@ -211,6 +211,12 @@ type Config struct {
// SubBatchDelay is the delay between sending sub batches of
// gossip messages.
SubBatchDelay time.Duration

// IgnoreHistoricalFilters will prevent syncers from replying with
// historical data when the remote peer sets a gossip_timestamp_range.
// This prevents ranges with old start times from causing us to dump the
// graph on connect.
IgnoreHistoricalFilters bool
}

// AuthenticatedGossiper is a subsystem which is responsible for receiving
Expand Down Expand Up @@ -313,11 +319,12 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
}),
}

Expand Down
7 changes: 7 additions & 0 deletions discovery/sync_manager.go
Expand Up @@ -82,6 +82,12 @@ type SyncManagerCfg struct {
// SyncManager when it should attempt a historical sync with a gossip
// sync peer.
HistoricalSyncTicker ticker.Ticker

// IgnoreHistoricalFilters will prevent syncers from replying with
// historical data when the remote peer sets a gossip_timestamp_range.
// This prevents ranges with old start times from causing us to dump the
// graph on connect.
IgnoreHistoricalFilters bool
}

// SyncManager is a subsystem of the gossiper that manages the gossip syncers
Expand Down Expand Up @@ -400,6 +406,7 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
sendToPeerSync: func(msgs ...lnwire.Message) error {
return peer.SendMessageLazy(true, msgs...)
},
ignoreHistoricalFilters: m.cfg.IgnoreHistoricalFilters,
})

// Gossip syncers are initialized by default in a PassiveSync type
Expand Down
12 changes: 12 additions & 0 deletions discovery/syncer.go
Expand Up @@ -235,6 +235,12 @@ type gossipSyncerCfg struct {
// replyHandler, meaning we will not reply to queries from our remote
// peer.
noReplyQueries bool

// ignoreHistoricalFilters will prevent syncers from replying with
// historical data when the remote peer sets a gossip_timestamp_range.
// This prevents ranges with old start times from causing us to dump the
// graph on connect.
ignoreHistoricalFilters bool
}

// GossipSyncer is a struct that handles synchronizing the channel graph state
Expand Down Expand Up @@ -951,6 +957,12 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er

g.Unlock()

// If requested, don't reply with historical gossip data when the remote
// peer sets their gossip timestamp range.
if g.cfg.ignoreHistoricalFilters {
return nil
}

// Now that the remote peer has applied their filter, we'll query the
// database for all the messages that are beyond this filter.
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
Expand Down
62 changes: 62 additions & 0 deletions discovery/syncer_test.go
@@ -1,8 +1,10 @@
package discovery

import (
"errors"
"math"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -336,6 +338,66 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
}
}

// TestGossipSyncerApplyNoHistoricalGossipFilter tests that once a gossip filter
// is applied for the remote peer, then we don't send the peer all known
// messages which are within their desired time horizon.
func TestGossipSyncerApplyNoHistoricalGossipFilter(t *testing.T) {
t.Parallel()

// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
_, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
syncer.cfg.ignoreHistoricalFilters = true

// We'll apply this gossip horizon for the remote peer.
remoteHorizon := &lnwire.GossipTimestampRange{
FirstTimestamp: unixStamp(25000),
TimestampRange: uint32(1000),
}

// After applying the gossip filter, the chan series should not be
// queried using the updated horizon.
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

select {
// No query received, success.
case <-time.After(3 * time.Second):
errChan <- nil

// Unexpected query received.
case <-chanSeries.horizonReq:
errChan <- errors.New("chan series should not have been " +
"queried")
}
}()

// We'll now attempt to apply the gossip filter for the remote peer.
syncer.ApplyGossipFilter(remoteHorizon)

// Ensure that the syncer's remote horizon was properly updated.
if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) {
t.Fatalf("expected remote horizon: %v, got: %v",
remoteHorizon, syncer.remoteUpdateHorizon)
}

// Wait for the query check to finish.
wg.Wait()

// Assert that no query was made as a result of applying the gossip
// filter.
err := <-errChan
if err != nil {
t.Fatalf(err.Error())
}
}

// TestGossipSyncerApplyGossipFilter tests that once a gossip filter is applied
// for the remote peer, then we send the peer all known messages which are
// within their desired time horizon.
Expand Down
37 changes: 19 additions & 18 deletions server.go
Expand Up @@ -712,24 +712,25 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
}

s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
NumActiveSyncers: cfg.NumGraphSyncPeers,
MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5,
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
NumActiveSyncers: cfg.NumGraphSyncPeers,
MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
},
s.identityPriv.PubKey(),
)
Expand Down