Skip to content

Commit

Permalink
multi: add flap count and last flap time to listpeers
Browse files Browse the repository at this point in the history
  • Loading branch information
carlaKC committed Sep 8, 2020
1 parent 8b78d46 commit 896804f
Show file tree
Hide file tree
Showing 6 changed files with 964 additions and 750 deletions.
88 changes: 88 additions & 0 deletions chanfitness/chaneventstore.go
Expand Up @@ -56,6 +56,9 @@ type ChannelEventStore struct {
// chanInfoRequests serves requests for information about our channel.
chanInfoRequests chan channelInfoRequest

// peerRequests serves requests for information about a peer.
peerRequests chan peerRequest

quit chan struct{}

wg sync.WaitGroup
Expand Down Expand Up @@ -108,6 +111,17 @@ type channelInfoResponse struct {
err error
}

type peerRequest struct {
peer route.Vertex
responseChan chan peerResponse
}

type peerResponse struct {
flapCount int
ts *time.Time
err error
}

// NewChannelEventStore initializes an event store with the config provided.
// Note that this function does not start the main event loop, Start() must be
// called.
Expand All @@ -116,6 +130,7 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
cfg: config,
peers: make(map[route.Vertex]peerMonitor),
chanInfoRequests: make(chan channelInfoRequest),
peerRequests: make(chan peerRequest),
quit: make(chan struct{}),
}

Expand Down Expand Up @@ -373,6 +388,15 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
resp.info, resp.err = c.getChanInfo(req)
req.responseChan <- resp

// Serve all requests for information about our peer.
case req := <-c.peerRequests:
var resp peerResponse

resp.flapCount, resp.ts, resp.err = c.flapCount(
req.peer,
)
req.responseChan <- resp

case <-c.cfg.FlapCountTicker.Ticks():
if err := c.recordFlapCount(); err != nil {
log.Errorf("could not record flap "+
Expand Down Expand Up @@ -449,6 +473,70 @@ func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
}, nil
}

// FlapCount returns the flap count we have for a peer and the timestamp of its
// last flap. If we do not have any flaps recorded for the peer, the last flap
// timestamp will be nil.
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
error) {

request := peerRequest{
peer: peer,
responseChan: make(chan peerResponse),
}

// Send a request for the peer's information to the main event loop,
// or return early with an error if the store has already received a
// shutdown signal.
select {
case c.peerRequests <- request:
case <-c.quit:
return 0, nil, errShuttingDown
}

// Return the response we receive on the response channel or exit early
// if the store is instructed to exit.
select {
case resp := <-request.responseChan:
return resp.flapCount, resp.ts, resp.err

case <-c.quit:
return 0, nil, errShuttingDown
}
}

// flapCount gets our peer flap count and last flap timestamp from our in memory
// record of a peer, falling back to on disk if we are not currently tracking
// the peer. If we have no flap count recorded for the peer, a nil last flap
// time will be returned.
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
error) {

// First check whether we are tracking this peer in memory, because this
// record will have the most accurate flap count. We do not fail if we
// can't find the peer in memory, because we may have previously
// recorded its flap count on disk.
peerMonitor, ok := c.peers[peer]
if ok {
count, ts := peerMonitor.getFlapCount()
return count, ts, nil
}

// Try to get our flap count from the database. If this value is not
// recorded, we return a nil last flap time to indicate that we have no
// record of the peer's flap count.
flapCount, err := c.cfg.ReadFlapCount(peer)
switch err {
case channeldb.ErrNoPeerBucket:
return 0, nil, nil

case nil:
return int(flapCount.Count), &flapCount.LastFlap, nil

default:
return 0, nil, err
}
}

// recordFlapCount will record our flap count for each peer that we are
// currently tracking, skipping peers that have a 0 flap count.
func (c *ChannelEventStore) recordFlapCount() error {
Expand Down
51 changes: 51 additions & 0 deletions chanfitness/chaneventstore_test.go
Expand Up @@ -290,3 +290,54 @@ func TestGetChanInfo(t *testing.T) {

ctx.stop()
}

// TestFlapCount tests querying the store for peer flap counts, covering the
// case where the peer is tracked in memory, and the case where we need to
// lookup the peer on disk.
func TestFlapCount(t *testing.T) {
clock := clock.NewTestClock(testNow)

var (
peer = route.Vertex{9, 9, 9}
peerFlapCount = 3
lastFlap = clock.Now()
)

// Create a test context with one peer's flap count already recorded,
// which mocks it already having its flap count stored on disk.
ctx := newChanEventStoreTestCtx(t)
ctx.flapUpdates[peer] = &channeldb.FlapCount{
Count: uint32(peerFlapCount),
LastFlap: lastFlap,
}

ctx.start()

// Create test variables for a peer and channel, but do not add it to
// our store yet.
peer1 := route.Vertex{1, 2, 3}

// First, query for a peer that we have no record of in memory or on
// disk and confirm that we indicate that the peer was not found.
_, ts, err := ctx.store.FlapCount(peer1)
require.NoError(t, err)
require.Nil(t, ts)

// Send an online event for our peer.
ctx.peerEvent(peer1, true)

// Assert that we now find a record of the peer with flap count = 1.
count, ts, err := ctx.store.FlapCount(peer1)
require.NoError(t, err)
require.Equal(t, lastFlap, *ts)
require.Equal(t, 1, count)

// Make a request for our peer that not tracked in memory, but does
// have its flap count stored on disk.
count, ts, err = ctx.store.FlapCount(peer)
require.NoError(t, err)
require.Equal(t, lastFlap, *ts)
require.Equal(t, peerFlapCount, count)

ctx.stop()
}

0 comments on commit 896804f

Please sign in to comment.