-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
channelnotifier.go
219 lines (188 loc) · 7.55 KB
/
channelnotifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package channelnotifier
import (
"sync"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/subscribe"
)
// ChannelNotifier is a subsystem which all active, inactive, and closed channel
// events pipe through. It takes subscriptions for its events, and whenever
// it receives a new event it notifies its subscribers over the proper channel.
type ChannelNotifier struct {
started sync.Once
stopped sync.Once
ntfnServer *subscribe.Server
chanDB *channeldb.ChannelStateDB
}
// PendingOpenChannelEvent represents a new event where a new channel has
// entered a pending open state.
type PendingOpenChannelEvent struct {
// ChannelPoint is the channel outpoint for the new channel.
ChannelPoint *wire.OutPoint
// PendingChannel is the channel configuration for the newly created
// channel. This might not have been persisted to the channel DB yet
// because we are still waiting for the final message from the remote
// peer.
PendingChannel *channeldb.OpenChannel
}
// OpenChannelEvent represents a new event where a channel goes from pending
// open to open.
type OpenChannelEvent struct {
// Channel is the channel that has become open.
Channel *channeldb.OpenChannel
}
// ActiveLinkEvent represents a new event where the link becomes active in the
// switch. This happens before the ActiveChannelEvent.
type ActiveLinkEvent struct {
// ChannelPoint is the channel point for the newly active channel.
ChannelPoint *wire.OutPoint
}
// InactiveLinkEvent represents a new event where the link becomes inactive in
// the switch.
type InactiveLinkEvent struct {
// ChannelPoint is the channel point for the inactive channel.
ChannelPoint *wire.OutPoint
}
// ActiveChannelEvent represents a new event where a channel becomes active.
type ActiveChannelEvent struct {
// ChannelPoint is the channelpoint for the newly active channel.
ChannelPoint *wire.OutPoint
}
// InactiveChannelEvent represents a new event where a channel becomes inactive.
type InactiveChannelEvent struct {
// ChannelPoint is the channelpoint for the newly inactive channel.
ChannelPoint *wire.OutPoint
}
// ClosedChannelEvent represents a new event where a channel becomes closed.
type ClosedChannelEvent struct {
// CloseSummary is the summary of the channel close that has occurred.
CloseSummary *channeldb.ChannelCloseSummary
}
// FullyResolvedChannelEvent represents a new event where a channel becomes
// fully resolved.
type FullyResolvedChannelEvent struct {
// ChannelPoint is the channelpoint for the newly fully resolved
// channel.
ChannelPoint *wire.OutPoint
}
// New creates a new channel notifier. The ChannelNotifier gets channel
// events from peers and from the chain arbitrator, and dispatches them to
// its clients.
func New(chanDB *channeldb.ChannelStateDB) *ChannelNotifier {
return &ChannelNotifier{
ntfnServer: subscribe.NewServer(),
chanDB: chanDB,
}
}
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
func (c *ChannelNotifier) Start() error {
var err error
c.started.Do(func() {
log.Info("ChannelNotifier starting")
err = c.ntfnServer.Start()
})
return err
}
// Stop signals the notifier for a graceful shutdown.
func (c *ChannelNotifier) Stop() error {
var err error
c.stopped.Do(func() {
log.Info("ChannelNotifier shutting down")
err = c.ntfnServer.Stop()
})
return err
}
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
// any time the Server is made aware of a new event. The subscription provides
// channel events from the point of subscription onwards.
//
// TODO(carlaKC): update to allow subscriptions to specify a block height from
// which we would like to subscribe to events.
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine
// that a new channel is pending. The pending channel is passed as a parameter
// instead of read from the database because it might not yet have been
// persisted to the DB because we still wait for the final message from the
// remote peer.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint,
pendingChan *channeldb.OpenChannel) {
event := PendingOpenChannelEvent{
ChannelPoint: &chanPoint,
PendingChannel: pendingChan,
}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send pending open channel update: %v", err)
}
}
// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a
// channel has gone from pending open to open.
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {
// Fetch the relevant channel from the database.
channel, err := c.chanDB.FetchChannel(nil, chanPoint)
if err != nil {
log.Warnf("Unable to fetch open channel from the db: %v", err)
}
// Send the open event to all channel event subscribers.
event := OpenChannelEvent{Channel: channel}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send open channel update: %v", err)
}
}
// NotifyClosedChannelEvent notifies the channelEventNotifier goroutine that a
// channel has closed.
func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) {
// Fetch the relevant closed channel from the database.
closeSummary, err := c.chanDB.FetchClosedChannel(&chanPoint)
if err != nil {
log.Warnf("Unable to fetch closed channel summary from the db: %v", err)
}
// Send the closed event to all channel event subscribers.
event := ClosedChannelEvent{CloseSummary: closeSummary}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send closed channel update: %v", err)
}
}
// NotifyFullyResolvedChannelEvent notifies the channelEventNotifier goroutine
// that a channel was fully resolved on chain.
func (c *ChannelNotifier) NotifyFullyResolvedChannelEvent(
chanPoint wire.OutPoint) {
// Send the resolved event to all channel event subscribers.
event := FullyResolvedChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send resolved channel update: %v", err)
}
}
// NotifyActiveLinkEvent notifies the channelEventNotifier goroutine that a
// link has been added to the switch.
func (c *ChannelNotifier) NotifyActiveLinkEvent(chanPoint wire.OutPoint) {
event := ActiveLinkEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send active link update: %v", err)
}
}
// NotifyActiveChannelEvent notifies the channelEventNotifier goroutine that a
// channel is active.
func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) {
event := ActiveChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send active channel update: %v", err)
}
}
// NotifyInactiveLinkEvent notifies the channelEventNotifier goroutine that a
// link has been removed from the switch.
func (c *ChannelNotifier) NotifyInactiveLinkEvent(chanPoint wire.OutPoint) {
event := InactiveLinkEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send inactive link update: %v", err)
}
}
// NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a
// channel is inactive.
func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) {
event := InactiveChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send inactive channel update: %v", err)
}
}