Skip to content

Commit

Permalink
multi: unify channel info request and expose flap count
Browse files Browse the repository at this point in the history
When we list channels, we get lifetime, uptime and now flap count from
the channel event store. This call unifies the values that external
callers will want (rather than splitting out a third request type just
for flaps) and exposes flap count over rpc.

We hardcode calculating uptime for the channel's whole life because
the listChannels endpoint doesn't start start/end params. Since we plan
on adding a separate uptime endpoint once we have persisted uptime, it's
ok to remove these parameters.
  • Loading branch information
carlaKC committed Aug 3, 2020
1 parent 3a81012 commit e36ace6
Show file tree
Hide file tree
Showing 6 changed files with 860 additions and 950 deletions.
165 changes: 66 additions & 99 deletions chanfitness/chaneventstore.go
Expand Up @@ -47,11 +47,8 @@ type ChannelEventStore struct {
// and offline events.
peers map[route.Vertex]bool

// lifespanRequests serves requests for the lifespan of channels.
lifespanRequests chan lifespanRequest

// uptimeRequests serves requests for the uptime of channels.
uptimeRequests chan uptimeRequest
// chanInfoRequests serves requests for information about our channel.
chanInfoRequests chan channelInfoRequest

quit chan struct{}

Expand Down Expand Up @@ -94,36 +91,16 @@ type Config struct {
FlapCountTicker ticker.Ticker
}

// lifespanRequest contains the channel ID required to query the store for a
// channel's lifespan and a blocking response channel on which the result is
// sent.
type lifespanRequest struct {
channelPoint wire.OutPoint
responseChan chan lifespanResponse
}

// lifespanResponse contains the response to a lifespanRequest and an error if
// one occurred.
type lifespanResponse struct {
start time.Time
end time.Time
err error
}

// uptimeRequest contains the parameters required to query the store for a
// channel's uptime and a blocking response channel on which the result is sent.
type uptimeRequest struct {
type channelInfoRequest struct {
channelPoint wire.OutPoint
startTime time.Time
endTime time.Time
responseChan chan uptimeResponse
start time.Time
end time.Time
responseChan chan channelInfoResponse
}

// uptimeResponse contains the response to an uptimeRequest and an error if one
// occurred.
type uptimeResponse struct {
uptime time.Duration
err error
type channelInfoResponse struct {
info *ChannelInfo
err error
}

// NewChannelEventStore initializes an event store with the config provided.
Expand All @@ -134,8 +111,7 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
cfg: config,
channels: make(map[wire.OutPoint]*chanEventLog),
peers: make(map[route.Vertex]bool),
lifespanRequests: make(chan lifespanRequest),
uptimeRequests: make(chan uptimeRequest),
chanInfoRequests: make(chan channelInfoRequest),
quit: make(chan struct{}),
}

Expand Down Expand Up @@ -365,35 +341,10 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
}

// Serve all requests for channel lifetime.
case req := <-c.lifespanRequests:
var resp lifespanResponse

channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
resp.start = channel.openedAt
resp.end = channel.closedAt
}

req.responseChan <- resp

// Serve requests for channel uptime.
case req := <-c.uptimeRequests:
var resp uptimeResponse

channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
uptime, err := channel.uptime(
req.startTime, req.endTime,
)

resp.uptime = uptime
resp.err = err
}
case req := <-c.chanInfoRequests:
var resp channelInfoResponse

resp.info, resp.err = c.getChanInfo(req)
req.responseChan <- resp

case <-c.cfg.FlapCountTicker.Ticks():
Expand All @@ -409,67 +360,83 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
}
}

// GetLifespan returns the opening and closing time observed for a channel and
// a boolean to indicate whether the channel is known the the event store. If
// the channel is still open, a zero close time is returned.
func (c *ChannelEventStore) GetLifespan(
channelPoint wire.OutPoint) (time.Time, time.Time, error) {
// ChannelInfo provides the set of information that the event store has recorded
// for a channel.
type ChannelInfo struct {
// Lifetime is the total amount of time we have monitored the channel
// for.
Lifetime time.Duration

// Uptime is the total amount of time that the channel peer has been
// observed as online during the monitored lifespan.
Uptime time.Duration

// FlapCount is the number of times the channel peer has gone offline
// over our record of the peer. Note that these flaps are tracked across
// channels (some of these flaps may have been before this channel was
// opened).
FlapCount uint32
}

request := lifespanRequest{
// GetChanInfo gets all the information we have on a channel in the event store.
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) (*ChannelInfo,
error) {

request := channelInfoRequest{
channelPoint: channelPoint,
responseChan: make(chan lifespanResponse),
start: time.Time{},
end: time.Time{},
responseChan: make(chan channelInfoResponse),
}

// Send a request for the channel's lifespan to the main event loop, or
// return early with an error if the store has already received a
// Send a request for the channel's information to the main event loop,
// or return early with an error if the store has already received a
// shutdown signal.
select {
case c.lifespanRequests <- request:
case c.chanInfoRequests <- request:
case <-c.quit:
return time.Time{}, time.Time{}, errShuttingDown
return nil, errShuttingDown
}

// Return the response we receive on the response channel or exit early
// if the store is instructed to exit.
select {
case resp := <-request.responseChan:
return resp.start, resp.end, resp.err
return resp.info, resp.err

case <-c.quit:
return time.Time{}, time.Time{}, errShuttingDown
return nil, errShuttingDown
}
}

// GetUptime returns the uptime of a channel over a period and an error if the
// channel cannot be found or the uptime calculation fails.
func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime,
endTime time.Time) (time.Duration, error) {
// getChanInfo collects channel information for a channel. It gets uptime over
// the full life of the channel.
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
error) {

request := uptimeRequest{
channelPoint: channelPoint,
startTime: startTime,
endTime: endTime,
responseChan: make(chan uptimeResponse),
// Look for the channel in our current set.
channel, ok := c.channels[req.channelPoint]
if !ok {
return nil, ErrChannelNotFound
}

// Send a request for the channel's uptime to the main event loop, or
// return early with an error if the store has already received a
// shutdown signal.
select {
case c.uptimeRequests <- request:
case <-c.quit:
return 0, errShuttingDown
// If our channel is not closed, we want to calculate uptime until the
// present.
endTime := channel.closedAt
if endTime.IsZero() {
endTime = c.cfg.Clock.Now()
}

// Return the response we receive on the response channel or exit early
// if the store is instructed to exit.
select {
case resp := <-request.responseChan:
return resp.uptime, resp.err

case <-c.quit:
return 0, errShuttingDown
uptime, err := channel.uptime(channel.openedAt, endTime)
if err != nil {
return nil, err
}

return &ChannelInfo{
Lifetime: endTime.Sub(channel.openedAt),
Uptime: uptime,
FlapCount: channel.flapCount,
}, nil
}

// recordFlapCount will record our flap count for each peer that we currently
Expand Down

0 comments on commit e36ace6

Please sign in to comment.