-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
manager.go
389 lines (322 loc) · 9.59 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
package autopilot
import (
"fmt"
"sync"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
// ManagerCfg houses a set of values and methods that is passed to the Manager
// for it to properly manage its autopilot agent.
type ManagerCfg struct {
// Self is the public key of the lnd instance. It is used to making
// sure the autopilot is not opening channels to itself.
Self *btcec.PublicKey
// PilotCfg is the config of the autopilot agent managed by the
// Manager.
PilotCfg *Config
// ChannelState is a function closure that returns the current set of
// channels managed by this node.
ChannelState func() ([]Channel, error)
// SubscribeTransactions is used to get a subscription for transactions
// relevant to this node's wallet.
SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*routing.TopologyClient, error)
}
// Manager is struct that manages an autopilot agent, making it possible to
// enable and disable it at will, and hand it relevant external information.
// It implements the autopilot grpc service, which is used to get data about
// the running autopilot, and give it relevant information.
type Manager struct {
started sync.Once
stopped sync.Once
cfg *ManagerCfg
// pilot is the current autopilot agent. It will be nil if the agent is
// disabled.
pilot *Agent
quit chan struct{}
wg sync.WaitGroup
sync.Mutex
}
// NewManager creates a new instance of the Manager from the passed config.
func NewManager(cfg *ManagerCfg) (*Manager, error) {
return &Manager{
cfg: cfg,
quit: make(chan struct{}),
}, nil
}
// Start starts the Manager.
func (m *Manager) Start() error {
m.started.Do(func() {})
return nil
}
// Stop stops the Manager. If an autopilot agent is active, it will also be
// stopped.
func (m *Manager) Stop() error {
m.stopped.Do(func() {
if err := m.StopAgent(); err != nil {
log.Errorf("Unable to stop pilot: %v", err)
}
close(m.quit)
m.wg.Wait()
})
return nil
}
// IsActive returns whether the autopilot agent is currently active.
func (m *Manager) IsActive() bool {
m.Lock()
defer m.Unlock()
return m.pilot != nil
}
// StartAgent creates and starts an autopilot agent from the Manager's
// config.
func (m *Manager) StartAgent() error {
m.Lock()
defer m.Unlock()
// Already active.
if m.pilot != nil {
return nil
}
// Next, we'll fetch the current state of open channels from the
// database to use as initial state for the auto-pilot agent.
initialChanState, err := m.cfg.ChannelState()
if err != nil {
return err
}
// Now that we have all the initial dependencies, we can create the
// auto-pilot instance itself.
pilot, err := New(*m.cfg.PilotCfg, initialChanState)
if err != nil {
return err
}
if err := pilot.Start(); err != nil {
return err
}
// Finally, we'll need to subscribe to two things: incoming
// transactions that modify the wallet's balance, and also any graph
// topology updates.
txnSubscription, err := m.cfg.SubscribeTransactions()
if err != nil {
pilot.Stop()
return err
}
graphSubscription, err := m.cfg.SubscribeTopology()
if err != nil {
txnSubscription.Cancel()
pilot.Stop()
return err
}
m.pilot = pilot
// We'll launch a goroutine to provide the agent with notifications
// whenever the balance of the wallet changes.
// TODO(halseth): can lead to panic if in process of shutting down.
m.wg.Add(1)
go func() {
defer txnSubscription.Cancel()
defer m.wg.Done()
for {
select {
case <-txnSubscription.ConfirmedTransactions():
pilot.OnBalanceChange()
// We won't act upon new unconfirmed transaction, as
// we'll only use confirmed outputs when funding.
// However, we will still drain this request in order
// to avoid goroutine leaks, and ensure we promptly
// read from the channel if available.
case <-txnSubscription.UnconfirmedTransactions():
case <-pilot.quit:
return
case <-m.quit:
return
}
}
}()
// We'll also launch a goroutine to provide the agent with
// notifications for when the graph topology controlled by the node
// changes.
m.wg.Add(1)
go func() {
defer graphSubscription.Cancel()
defer m.wg.Done()
for {
select {
case topChange, ok := <-graphSubscription.TopologyChanges:
// If the router is shutting down, then we will
// as well.
if !ok {
return
}
for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
// If this isn't an advertisement by
// the backing lnd node, then we'll
// continue as we only want to add
// channels that we've created
// ourselves.
if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
continue
}
// If this is indeed a channel we
// opened, then we'll convert it to the
// autopilot.Channel format, and notify
// the pilot of the new channel.
chanNode := NewNodeID(
edgeUpdate.ConnectingNode,
)
chanID := lnwire.NewShortChanIDFromInt(
edgeUpdate.ChanID,
)
edge := Channel{
ChanID: chanID,
Capacity: edgeUpdate.Capacity,
Node: chanNode,
}
pilot.OnChannelOpen(edge)
}
// For each closed channel, we'll obtain
// the chanID of the closed channel and send it
// to the pilot.
for _, chanClose := range topChange.ClosedChannels {
chanID := lnwire.NewShortChanIDFromInt(
chanClose.ChanID,
)
pilot.OnChannelClose(chanID)
}
// If new nodes were added to the graph, or nod
// information has changed, we'll poke autopilot
// to see if it can make use of them.
if len(topChange.NodeUpdates) > 0 {
pilot.OnNodeUpdates()
}
case <-pilot.quit:
return
case <-m.quit:
return
}
}
}()
log.Debugf("Manager started autopilot agent")
return nil
}
// StopAgent stops any active autopilot agent.
func (m *Manager) StopAgent() error {
m.Lock()
defer m.Unlock()
// Not active, so we can return early.
if m.pilot == nil {
return nil
}
if err := m.pilot.Stop(); err != nil {
return err
}
// Make sure to nil the current agent, indicating it is no longer
// active.
m.pilot = nil
log.Debugf("Manager stopped autopilot agent")
return nil
}
// QueryHeuristics queries the available autopilot heuristics for node scores.
func (m *Manager) QueryHeuristics(nodes []NodeID, localState bool) (
HeuristicScores, error) {
m.Lock()
defer m.Unlock()
n := make(map[NodeID]struct{})
for _, node := range nodes {
n[node] = struct{}{}
}
log.Debugf("Querying heuristics for %d nodes", len(n))
return m.queryHeuristics(n, localState)
}
// HeuristicScores is an alias for a map that maps heuristic names to a map of
// scores for pubkeys.
type HeuristicScores map[string]map[NodeID]float64
// queryHeuristics gets node scores from all available simple heuristics, and
// the agent's current active heuristic.
//
// NOTE: Must be called with the manager's lock.
func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
HeuristicScores, error) {
// If we want to take the local state into action when querying the
// heuristics, we fetch it. If not we'll just pass an emply slice to
// the heuristic.
var totalChans []Channel
var err error
if localState {
// Fetch the current set of channels.
totalChans, err = m.cfg.ChannelState()
if err != nil {
return nil, err
}
// If the agent is active, we can merge the channel state with
// the channels pending open.
if m.pilot != nil {
m.pilot.chanStateMtx.Lock()
m.pilot.pendingMtx.Lock()
totalChans = mergeChanState(
m.pilot.pendingOpens, m.pilot.chanState,
)
m.pilot.pendingMtx.Unlock()
m.pilot.chanStateMtx.Unlock()
}
}
// As channel size we'll use the maximum size.
chanSize := m.cfg.PilotCfg.Constraints.MaxChanSize()
// We'll start by getting the scores from each available sub-heuristic,
// in addition the current agent heuristic.
var heuristics []AttachmentHeuristic
heuristics = append(heuristics, availableHeuristics...)
heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
report := make(HeuristicScores)
for _, h := range heuristics {
name := h.Name()
// If the agent heuristic is among the simple heuristics it
// might get queried more than once. As an optimization we'll
// just skip it the second time.
if _, ok := report[name]; ok {
continue
}
s, err := h.NodeScores(
m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
)
if err != nil {
return nil, fmt.Errorf("unable to get sub score: %v",
err)
}
log.Debugf("Heuristic \"%v\" scored %d nodes", name, len(s))
scores := make(map[NodeID]float64)
for nID, score := range s {
scores[nID] = score.Score
}
report[name] = scores
}
return report, nil
}
// SetNodeScores is used to set the scores of the given heuristic, if it is
// active, and ScoreSettable.
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
m.Lock()
defer m.Unlock()
// It must be ScoreSettable to be available for external
// scores.
s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
if !ok {
return fmt.Errorf("current heuristic doesn't support " +
"external scoring")
}
// Heuristic was found, set its node scores.
applied, err := s.SetNodeScores(name, scores)
if err != nil {
return err
}
if !applied {
return fmt.Errorf("heuristic with name %v not found", name)
}
// If the autopilot agent is active, notify about the updated
// heuristic.
if m.pilot != nil {
m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
}
return nil
}