Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: introduce gossiper syncManager subsystem #2740

Merged
merged 13 commits into from Apr 3, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

discovery: allow gossip syncer to perform historical syncs

In this commit, we introduce the ability for gossip syncers to perform
historical syncs. This allows us to reconcile any channels we're missing
that the remote peer has starting from the genesis block of the chain.
This commit serves as a prerequisite to the SyncManager, introduced in a
later commit, where we'll be able to make spot checks by performing
historical syncs with peers to ensure we have as much of the graph as
possible.
  • Loading branch information...
wpaulino committed Mar 29, 2019
commit 042241dc48b99e81e44d7673e092ab59872195d6
@@ -236,6 +236,20 @@ type GossipSyncer struct {
// machine behaves as expected.
syncTransitionReqs chan *syncTransitionReq

// historicalSyncReqs is a channel that serves as a signal for the
// gossip syncer to perform a historical sync. Theese can only be done
// once the gossip syncer is in a chansSynced state to ensure its state
// machine behaves as expected.
historicalSyncReqs chan struct{}

// genHistoricalChanRangeQuery when true signals to the gossip syncer
// that it should request the remote peer for all of its known channel
// IDs starting from the genesis block of the chain. This can only
// happen if the gossip syncer receives a request to attempt a
// historical sync. It can be unset if the syncer ever transitions from
// PassiveSync to ActiveSync.
genHistoricalChanRangeQuery bool

// gossipMsgs is a channel that all messages from the target peer will
// be sent over.
gossipMsgs chan lnwire.Message
@@ -291,6 +305,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
cfg: cfg,
rateLimiter: rateLimiter,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan struct{}),
gossipMsgs: make(chan lnwire.Message, 100),
quit: make(chan struct{}),
}
@@ -338,7 +353,9 @@ func (g *GossipSyncer) channelGraphSyncer() {
case syncingChans:
// If we're in this state, then we'll send the remote
// peer our opening QueryChannelRange message.
queryRangeMsg, err := g.genChanRangeQuery()
queryRangeMsg, err := g.genChanRangeQuery(
g.genHistoricalChanRangeQuery,
)
if err != nil {
log.Errorf("unable to gen chan range "+
"query: %v", err)
@@ -481,6 +498,9 @@ func (g *GossipSyncer) channelGraphSyncer() {
case req := <-g.syncTransitionReqs:
req.errChan <- g.handleSyncTransition(req)

case <-g.historicalSyncReqs:
g.handleHistoricalSync()

case <-g.quit:
return
}
@@ -624,26 +644,29 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro

// genChanRangeQuery generates the initial message we'll send to the remote
// party when we're kicking off the channel graph synchronization upon
// connection.
func (g *GossipSyncer) genChanRangeQuery() (*lnwire.QueryChannelRange, error) {
// connection. The historicalQuery boolean can be used to generate a query from
// the genesis block of the chain.
func (g *GossipSyncer) genChanRangeQuery(
historicalQuery bool) (*lnwire.QueryChannelRange, error) {

// First, we'll query our channel graph time series for its highest
// known channel ID.
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
if err != nil {
return nil, err
}

// Once we have the chan ID of the newest, we'll obtain the block
// height of the channel, then subtract our default horizon to ensure
// we don't miss any channels. By default, we go back 1 day from the
// newest channel.
// Once we have the chan ID of the newest, we'll obtain the block height
// of the channel, then subtract our default horizon to ensure we don't
// miss any channels. By default, we go back 1 day from the newest
// channel, unless we're attempting a historical sync, where we'll
// actually start from the genesis block instead.
var startHeight uint32
switch {
case newestChan.BlockHeight <= chanRangeQueryBuffer:
case historicalQuery:
fallthrough
This conversation was marked as resolved by Roasbeef

This comment has been minimized.

Copy link
@cfromknecht

cfromknecht Apr 2, 2019

Collaborator

this case is captured by the above case

This comment has been minimized.

Copy link
@wpaulino

wpaulino Apr 2, 2019

Author Collaborator

Fixed.

case newestChan.BlockHeight == 0:
case newestChan.BlockHeight <= chanRangeQueryBuffer:
startHeight = 0

default:
startHeight = uint32(newestChan.BlockHeight - chanRangeQueryBuffer)
}
@@ -1080,6 +1103,10 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
timestampRange = math.MaxUint32
newState = syncingChans

This comment has been minimized.

Copy link
@Roasbeef

Roasbeef Apr 2, 2019

Member

Shouldn't this instead go to chanSynced, or is the idea now to combine the prior historical and active sync? So active is just "start from scratch to fill in the gaps and also being to receive new updates"?

This comment has been minimized.

Copy link
@wpaulino

wpaulino Apr 2, 2019

Author Collaborator

Since we don't query the remote peer for their channels when under a passive sync, I figured it should happen once it transitions to active.


// We'll set genHistoricalChanRangeQuery to false since in order
// to not perform another historical sync if we previously have.
g.genHistoricalChanRangeQuery = false

This comment has been minimized.

Copy link
@halseth

halseth Apr 3, 2019

Collaborator

This needs a comment

This comment has been minimized.

Copy link
@wpaulino

wpaulino Apr 3, 2019

Author Collaborator

Ended up removing the line completely. If it's done a historical sync before, then it shouldn't be a problem to do it again as it'll most likely be a NOP.


// If a PassiveSync transition has been requested, then we should no
// longer receive any new updates from the remote peer. We can do this
// by setting our update horizon to a range in the past ensuring no
@@ -1114,3 +1141,29 @@ func (g *GossipSyncer) setSyncType(syncType SyncerType) {
func (g *GossipSyncer) SyncType() SyncerType {
return SyncerType(atomic.LoadUint32(&g.syncType))
}

// historicalSync sends a request to the gossip syncer to perofmr a historical
// sync.
//
// NOTE: This can only be done once the gossip syncer has reached its final
// chansSynced state.
func (g *GossipSyncer) historicalSync() error {
select {
case g.historicalSyncReqs <- struct{}{}:
return nil
case <-time.After(syncTransitionTimeout):
return ErrSyncTransitionTimeout
case <-g.quit:
return ErrGossiperShuttingDown
}
}

// handleHistoricalSync handles a request to the gossip syncer to perform a
// historical sync.
func (g *GossipSyncer) handleHistoricalSync() {
This conversation was marked as resolved by wpaulino

This comment has been minimized.

Copy link
@halseth

halseth Apr 2, 2019

Collaborator

I like this! 😀

// We'll go back to our initial syncingChans state in order to request
// the remote peer to give us all of the channel IDs they know of
// starting from the genesis block.
g.genHistoricalChanRangeQuery = true
g.setSyncState(syncingChans)
}
@@ -730,7 +730,7 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
// If we now ask the syncer to generate an initial range query, it
// should return a start height that's back chanRangeQueryBuffer
// blocks.
rangeQuery, err := syncer.genChanRangeQuery()
rangeQuery, err := syncer.genChanRangeQuery(false)
if err != nil {
t.Fatalf("unable to resp: %v", err)
}
@@ -743,7 +743,22 @@ func TestGossipSyncerGenChanRangeQuery(t *testing.T) {
}
if rangeQuery.NumBlocks != math.MaxUint32-firstHeight {
t.Fatalf("wrong num blocks: expected %v, got %v",
rangeQuery.NumBlocks, math.MaxUint32-firstHeight)
math.MaxUint32-firstHeight, rangeQuery.NumBlocks)
}

// Generating a historical range query should result in a start height
// of 0.
rangeQuery, err = syncer.genChanRangeQuery(true)
if err != nil {
t.Fatalf("unable to resp: %v", err)
}
if rangeQuery.FirstBlockHeight != 0 {
t.Fatalf("incorrect chan range query: expected %v, %v", 0,
rangeQuery.FirstBlockHeight)
}
if rangeQuery.NumBlocks != math.MaxUint32 {
t.Fatalf("wrong num blocks: expected %v, got %v",
math.MaxUint32, rangeQuery.NumBlocks)
}
}

@@ -2082,3 +2097,46 @@ func TestGossipSyncerSyncTransitions(t *testing.T) {
})
}
}

// TestGossipSyncerHistoricalSync tests that a gossip syncer can perform a
// historical sync with the remote peer.
func TestGossipSyncerHistoricalSync(t *testing.T) {
t.Parallel()

// We'll create a new gossip syncer and manually override its state to
// chansSynced. This is necessary as the syncer can only process
// historical sync requests in this state.
msgChan, syncer, _ := newTestSyncer(
lnwire.ShortChannelID{BlockHeight: latestKnownHeight},
defaultEncoding, defaultChunkSize,
)
syncer.setSyncType(PassiveSync)
syncer.setSyncState(chansSynced)

syncer.Start()
defer syncer.Stop()

syncer.historicalSync()

// We should expect to see a single lnwire.QueryChannelRange message be
// sent to the remote peer with a FirstBlockHeight of 0.
expectedMsg := &lnwire.QueryChannelRange{
FirstBlockHeight: 0,
NumBlocks: math.MaxUint32,
}

select {
case msgs := <-msgChan:
if len(msgs) != 1 {
t.Fatalf("expected to send a single "+
"lnwire.QueryChannelRange message, got %d",
len(msgs))
}
if !reflect.DeepEqual(msgs[0], expectedMsg) {
t.Fatalf("expected to send message: %v\ngot: %v",
spew.Sdump(expectedMsg), spew.Sdump(msgs[0]))
}
case <-time.After(time.Second):
t.Fatalf("expected to send a lnwire.QueryChannelRange message")
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.