diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go new file mode 100644 index 000000000000..f2eb65c811c8 --- /dev/null +++ b/chanfitness/chanevent.go @@ -0,0 +1,122 @@ +package chanfitness + +import ( + "sort" + "sync" + "time" +) + +type eventType int + +const ( + channelOpenedEvent eventType = iota + 1 + channelClosedEvent +) + +// String provides string representations of channel events. +func (e eventType) String() string { + switch e { + case channelOpenedEvent: + return "channel_opened" + + case channelClosedEvent: + return "channel_closed" + } + return "unknown" +} + +// channelEvent is a a timestamped event which is observed on a per channel basie. +type channelEvent struct { + timestamp time.Time + eventType eventType +} + +// chanEventLog stores all events that have occurred over a channel's lifetime. +type chanEventLog struct { + // id is the uInt64 of the short channel ID. + id uint64 + + peer [33]byte + + // events is a log of timestamped events observed for the channel. + events []*channelEvent + + // now is expected to return the current time. It is supplied as an + // external function to enable deterministic unit tests. + now func() time.Time + + sync.Mutex +} + +func (e *chanEventLog) openChannel() { + e.add(channelOpenedEvent) +} + +func (e *chanEventLog) closeChannel() { + e.add(channelClosedEvent) +} + +// Add appends a timestamped event the event log. +func (e *chanEventLog) add(eventType eventType) { + e.Lock() + defer e.Unlock() + + e.events = append(e.events, &channelEvent{ + timestamp: time.Now(), + eventType: eventType, + }) + + log.Infof("Channel %v recording event: %v", e.id, eventType) +} + +// getLifespan returns the total observed lifetime of a channel's event log. +// If a channel is open, the lifespan is calculated from open time until the +// present. If the channel is closed, this lifespan terminates when the channel +// was closed. In absence of a channel opened event, the earliest entry in the +// event log is considered to be the opening time. +func (e *chanEventLog) getLifespan() (lifespan time.Duration) { + e.Lock() + defer e.Unlock() + + sort.Slice(e.events[:], func(i, j int) bool { + return e.events[i].timestamp.Before(e.events[j].timestamp) + }) + + // if there are no events, we cannot calculate lifetime. + if len(e.events) == 0 { + return 0 + } + + // Close time starts as the present time, but will be set back to the actual + // closing time if there is a close event present. + closeTime := e.now() + + var openTime time.Time + + for _, e := range e.events { + // set open time to the earliest timestamp in the event log. The earliest + // event is expected to be a channel opened event, but if it is not, it + // will be overriden by a subsequent open event, should one be present, + // because the events log is ordered by time for this loop. + if openTime.IsZero() || e.timestamp.Before(openTime) { + openTime = e.timestamp + } + + switch e.eventType { + case channelOpenedEvent: + openTime = e.timestamp + + case channelClosedEvent: + closeTime = e.timestamp + } + + } + + // if a close event was recorded before an open event, just return zero to + // prevent returning a negative lifespan. + if closeTime.Before(openTime) { + return 0 + } + + return closeTime.Sub(openTime) +} diff --git a/chanfitness/chanevent_test.go b/chanfitness/chanevent_test.go new file mode 100644 index 000000000000..219c4c343f13 --- /dev/null +++ b/chanfitness/chanevent_test.go @@ -0,0 +1,122 @@ +package chanfitness + +import ( + "testing" + "time" +) + +func TestGetLifeSpan(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + events []*channelEvent + expectedLifespan time.Duration + }{ + { + name: "No events, zero lifespan", + }, + { + name: "Only opening event", + events: []*channelEvent{ + { + timestamp: now.Add(time.Hour * -1), + eventType: channelOpenedEvent, + }, + }, + expectedLifespan: time.Hour, + }, + { + name: "Open and close time", + events: []*channelEvent{ + { + timestamp: now.Add(time.Hour * -4), + eventType: channelOpenedEvent, + }, + { + timestamp: now.Add(time.Hour * -1), + eventType: channelClosedEvent, + }, + }, + expectedLifespan: time.Hour * 3, + }, + { + name: "Open and close time", + events: []*channelEvent{ + { + timestamp: now.Add(time.Hour * -4), + eventType: channelOpenedEvent, + }, + { + timestamp: now.Add(time.Hour * -1), + eventType: channelClosedEvent, + }, + }, + expectedLifespan: time.Hour * 3, + }, + { + name: "Close before open, return 0", + events: []*channelEvent{ + { + timestamp: now.Add(time.Hour * -1), + eventType: channelOpenedEvent, + }, + { + timestamp: now.Add(time.Hour * -4), + eventType: channelClosedEvent, + }, + }, + }, + { + name: "No open event, use earliest timestamp", + events: []*channelEvent{ + { + timestamp: now.Add(time.Hour * -2), + }, + { + timestamp: now.Add(time.Hour * -1), + eventType: channelClosedEvent, + }, + }, + expectedLifespan: time.Hour, + }, + { + name: "Event before open event, use open timestamp", + events: []*channelEvent{ + { + timestamp: now.Add(time.Hour * -3), + }, + { + timestamp: now.Add(time.Hour * -2), + eventType: channelOpenedEvent, + }, + { + timestamp: now.Add(time.Hour * -1), + eventType: channelClosedEvent, + }, + }, + expectedLifespan: time.Hour, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + eventLog := &chanEventLog{ + events: test.events, + now: func() time.Time { + return now + }, + } + + lifespan := eventLog.getLifespan() + + if test.expectedLifespan != lifespan { + t.Fatalf( + "Expected lifespan: %v, got: %v ", + test.expectedLifespan, + lifespan, + ) + } + }) + } +} diff --git a/chanfitness/chanfitness.go b/chanfitness/chanfitness.go new file mode 100644 index 000000000000..fb66a03b56e3 --- /dev/null +++ b/chanfitness/chanfitness.go @@ -0,0 +1,199 @@ +// Package chanfitness monitors the behaviour of channels to provide scores +// which can be factored into the decision to close a channel. This is +// achieved by maintaining an event store which tracks events for each +// channel. +// +// Lifetime: the total time the channel has been known to the scoring system. +// Note that this does not equal channel lifetime because data is not currently +// persisted. +package chanfitness + +import ( + "sync" + "time" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" + "github.com/lightningnetwork/lnd/subscribe" +) + +// ChannelEventStore maintains a set of event logs for the node's channels. +// It is intended to provide insight into the performance of channels. +type ChannelEventStore struct { + cfg *Config + + channels map[uint64]*chanEventLog + + quit chan struct{} + + sync.RWMutex + + wg sync.WaitGroup +} + +// Config provides the event store with functions required to monitor channel activity. +// All elements of the config must be non-nil for the event store to operate. +type Config struct { + // Channel events provides a subscription client which provides a + // stream of channel events. + ChannelEvents func() (*subscribe.Client, error) + + // Get open channels provides a list of existing open channels + // which is used to populate the ChannelEventStore with a set of channels + // on startup. + GetOpenChannels func() ([]*channeldb.OpenChannel, error) +} + +// NewChannelEventStore initializes an event store with the config provided. Note that +// this function does not start the monitoring process, Start() must be called +// to start the monitoring process. +func NewChannelEventStore(config *Config) (*ChannelEventStore, error) { + store := &ChannelEventStore{ + cfg: config, + channels: make(map[uint64]*chanEventLog), + quit: make(chan struct{}), + } + + return store, nil +} + +// Start adds all existing open channels to the event store and starts monitoring +// for channel opened/closed events so that it can add and remove channels as +// appropriate. +func (c *ChannelEventStore) Start() error { + // Add the existing set of channels to the event store. This is required + // because channel events will not be triggered for channels that exist + // at startup time. + channels, err := c.cfg.GetOpenChannels() + if err != nil { + return err + } + + log.Infof("Adding %v channels to event store", len(channels)) + + for _, ch := range channels { + var peerKey [33]byte + copy(peerKey[:], ch.IdentityPub.SerializeCompressed()) + + c.openChannel(ch.ShortChanID().ToUint64(), peerKey) + } + + // channelUpdates provides a subscription to channel events which is used to + // listen for the creation of new channels and closing of existing channels. + channelUpdates, err := c.cfg.ChannelEvents() + if err != nil { + return err + } + + c.wg.Add(1) + go c.MonitorChannelEvents(channelUpdates.Updates(), channelUpdates.Cancel) + + return nil +} + +// Stop terminates all goroutines started by the event store. +func (c *ChannelEventStore) Stop() { + c.Lock() + defer c.Unlock() + + log.Info("Stopping event store") + + // Stop the MonitorChannelEvents goroutine. + close(c.quit) + + c.wg.Wait() +} + +func (c *ChannelEventStore) getEventLog(channelID uint64) (*chanEventLog, bool) { + c.Lock() + defer c.Unlock() + + eventLog, ok := c.channels[channelID] + return eventLog, ok +} + +// OpenChannel adds new channels to the ChannelEventStore's map of channels being +// monitored and records a channel opened event. If the channel is already +// present in the map, the function returns early. +func (c *ChannelEventStore) openChannel(channelID uint64, peer [33]byte) { + c.Lock() + defer c.Unlock() + + eventLog, ok := c.channels[channelID] + // The channel is not already known to the event store. + if ok { + return + } + + eventLog = &chanEventLog{ + id: channelID, + peer: peer, + now: time.Now, + } + + // Since this is the first time this channel has been seen, a + // channel opened event is recorded. Note that while channel events + // are not persisted, channel open events will be later than their + // actual open time. + eventLog.openChannel() + + c.channels[channelID] = eventLog +} + +// closeChannel records a closed event for a channel, and returns early is the +// channel is not known to the event store. +func (c *ChannelEventStore) closeChannel(channelID uint64) { + c.Lock() + defer c.Unlock() + + eventLog, ok := c.channels[channelID] + if !ok { + // Return early the channel is not known, since we have not + // been tracking events for it anyway. + return + } + + eventLog.closeChannel() +} + +// MonitorChannelEvents consumes channel opened and closed events and adds and +// updates the channel's event log accordingly. +func (c *ChannelEventStore) MonitorChannelEvents(updates <-chan interface{}, + cancel func()) { + + defer c.wg.Done() + + for { + select { + case e := <-updates: + switch event := e.(type) { + // A new channel has been opened, we must add a channel and record + // a channel open event. + case channelnotifier.OpenChannelEvent: + var peerKey [33]byte + copy( + peerKey[:], + event.Channel.IdentityPub.SerializeCompressed(), + ) + + c.openChannel( + event.Channel.ShortChanID().ToUint64(), + peerKey, + ) + + // A channel has been closed, we must remove the channel from the + // store and record a channel closed event. + case channelnotifier.ClosedChannelEvent: + c.closeChannel(event.CloseSummary.ShortChanID.ToUint64()) + } + + // receiving on the event store's quit channel indicates that monitoring + // should be terminated. To do so, cancel is called to terminate the + // subscription to channel updates. + case <-c.quit: + cancel() + return + } + + } +} diff --git a/chanfitness/chanfitness_test.go b/chanfitness/chanfitness_test.go new file mode 100644 index 000000000000..277ce76f43f4 --- /dev/null +++ b/chanfitness/chanfitness_test.go @@ -0,0 +1,132 @@ +package chanfitness + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" + "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lnwire" +) + +func TestMonitorChannelEvents(t *testing.T) { + privKey, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("Error getting pubkey: %v", err) + } + + shortID := lnwire.ShortChannelID{ + BlockHeight: 1234, + TxIndex: 2, + TxPosition: 2, + } + + tests := []struct { + name string + + cancel func() + + // event queue us a set of events that should be sent into the + // updates channel passed to MonitorChannelEvents. + eventQueue []interface{} + + // expectedEvents is an ordered set of events that we expect the + // to have been recorded by the event store for the channel. + expectedEvents []*channelEvent + }{ + + { + name: "Open channel event", + cancel: func() {}, + expectedEvents: []*channelEvent{ + { + eventType: channelOpenedEvent, + }, + }, + eventQueue: []interface{}{ + channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + }, + }, + }, + { + name: "Close channel event", + cancel: func() {}, + expectedEvents: []*channelEvent{ + { + eventType: channelOpenedEvent, + }, + { + eventType: channelClosedEvent, + }, + }, + eventQueue: []interface{}{ + channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + }, + channelnotifier.ClosedChannelEvent{ + CloseSummary: &channeldb.ChannelCloseSummary{ + ShortChanID: shortID, + RemotePub: privKey.PubKey(), + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := &ChannelEventStore{ + channels: map[uint64]*chanEventLog{}, + quit: make(chan struct{}), + } + + // Populate a buffered channel with the test event queue. + updates := make(chan interface{}, len(test.eventQueue)) + for _, e := range test.eventQueue { + updates <- e + } + + // Start goroutine to consume update events. + store.wg.Add(1) + go store.MonitorChannelEvents(updates, test.cancel) + + // Wait until the eventlog has expected number of events to prevent + // data races when we kill the monitoring goroutine. + err := lntest.WaitPredicate(func() (ok bool) { + eventlog, ok := store.getEventLog(shortID.ToUint64()) + if !ok { + return false + } + + return len(test.expectedEvents) == len(eventlog.events) + + }, time.Second) + + if err != nil { + t.Fatalf("Did not get expected number of events") + } + + // Check that the events in the log are as expected. + eventlog, _ := store.getEventLog(shortID.ToUint64()) + for i, e := range test.expectedEvents { + if eventlog.events[i].eventType != e.eventType { + t.Fatalf("Expected: event type: %v, got: %v", + eventlog.events[i].eventType, + e.eventType, + ) + } + } + + store.Stop() + }) + } +} diff --git a/chanfitness/log.go b/chanfitness/log.go new file mode 100644 index 000000000000..6297633a0b05 --- /dev/null +++ b/chanfitness/log.go @@ -0,0 +1,29 @@ +package chanfitness + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("CHFT", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/log.go b/log.go index b79fa0472ee1..84a2fdc83705 100644 --- a/log.go +++ b/log.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" @@ -89,6 +90,7 @@ var ( chbuLog = build.NewSubLogger("CHBU", backendLog.Logger) promLog = build.NewSubLogger("PROM", backendLog.Logger) wtclLog = build.NewSubLogger("WTCL", backendLog.Logger) + chftLog = build.NewSubLogger("CHFT", backendLog.Logger) ) // Initialize package-global logger variables. @@ -118,6 +120,7 @@ func init() { chanbackup.UseLogger(chbuLog) monitoring.UseLogger(promLog) wtclient.UseLogger(wtclLog) + chanfitness.UseLogger(chftLog) addSubLogger(routerrpc.Subsystem, routerrpc.UseLogger) } diff --git a/server.go b/server.go index 11f32cb8466a..81edc0e5f7dc 100644 --- a/server.go +++ b/server.go @@ -29,6 +29,7 @@ import ( "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" @@ -236,6 +237,10 @@ type server struct { // channelNotifier to be notified of newly opened and closed channels. chanSubSwapper *chanbackup.SubSwapper + // chanEventStore tracks the performance of channels to determine + // whether a channel should be closed. + chanEventStore *chanfitness.ChannelEventStore + quit chan struct{} wg sync.WaitGroup @@ -1081,6 +1086,17 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, return nil, err } + // Create a chnanel event store which monitors all open channels. + eventStore, err := chanfitness.NewChannelEventStore(&chanfitness.Config{ + ChannelEvents: s.channelNotifier.SubscribeChannelEvents, + GetOpenChannels: s.chanDB.FetchAllOpenChannels, + }) + if err != nil { + return nil, err + } + + s.chanEventStore = eventStore + if cfg.WtClient.IsActive() { policy := wtpolicy.DefaultPolicy() @@ -1235,6 +1251,11 @@ func (s *server) Start() error { return } + if err := s.chanEventStore.Start(); err != nil { + startErr = err + return + } + // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure // that have all the information we need to handle channel @@ -1349,6 +1370,7 @@ func (s *server) Stop() error { s.invoices.Stop() s.fundingMgr.Stop() s.chanSubSwapper.Stop() + s.chanEventStore.Stop() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer.