-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
reliable_sender.go
330 lines (276 loc) · 10.2 KB
/
reliable_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
package discovery
import (
"sync"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// reliableSenderCfg contains all of necessary items for the reliableSender to
// carry out its duties.
type reliableSenderCfg struct {
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// IsMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
IsMsgStale func(lnwire.Message) bool
}
// peerManager contains the set of channels required for the peerHandler to
// properly carry out its duties.
type peerManager struct {
// msgs is the channel through which messages will be streamed to the
// handler in order to send the message to the peer while they're
// online.
msgs chan lnwire.Message
// done is a channel that will be closed to signal that the handler for
// the given peer has been torn down for whatever reason.
done chan struct{}
}
// reliableSender is a small subsystem of the gossiper used to reliably send
// gossip messages to peers.
type reliableSender struct {
start sync.Once
stop sync.Once
cfg reliableSenderCfg
// activePeers keeps track of whether a peerHandler exists for a given
// peer. A peerHandler is tasked with handling requests for messages
// that should be reliably sent to peers while also taking into account
// the peer's connection lifecycle.
activePeers map[[33]byte]peerManager
activePeersMtx sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// newReliableSender returns a new reliableSender backed by the given config.
func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
return &reliableSender{
cfg: *cfg,
activePeers: make(map[[33]byte]peerManager),
quit: make(chan struct{}),
}
}
// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
err = s.resendPendingMsgs()
})
return err
}
// Stop halts the reliable sender from sending messages to peers.
func (s *reliableSender) Stop() {
s.stop.Do(func() {
close(s.quit)
s.wg.Wait()
})
}
// sendMessage constructs a request to send a message reliably to a peer. In the
// event that the peer is currently offline, this will only write the message to
// disk. Once the peer reconnects, this message, along with any others pending,
// will be sent to the peer.
func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// We'll start by persisting the message to disk. This allows us to
// resend the message upon restarts and peer reconnections.
if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
return err
}
// Then, we'll spawn a peerHandler for this peer to handle resending its
// pending messages while taking into account its connection lifecycle.
spawnHandler:
msgHandler, ok := s.spawnPeerHandler(peerPubKey)
// If the handler wasn't previously active, we can exit now as we know
// that the message will be sent once the peer online notification is
// received. This prevents us from potentially sending the message
// twice.
if !ok {
return nil
}
// Otherwise, we'll attempt to stream the message to the handler.
// There's a subtle race condition where the handler can be torn down
// due to all of the messages sent being stale, so we'll handle this
// gracefully by spawning another one to prevent blocking.
select {
case msgHandler.msgs <- msg:
case <-msgHandler.done:
goto spawnHandler
case <-s.quit:
return ErrGossiperShuttingDown
}
return nil
}
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already
// one active or not.
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
defer s.activePeersMtx.Unlock()
msgHandler, ok := s.activePeers[peerPubKey]
if !ok {
msgHandler = peerManager{
msgs: make(chan lnwire.Message),
done: make(chan struct{}),
}
s.activePeers[peerPubKey] = msgHandler
s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey)
}
return msgHandler, ok
}
// peerHandler is responsible for handling our reliable message send requests
// for a given peer while also taking into account the peer's connection
// lifecycle. Any messages that are attempted to be sent while the peer is
// offline will be queued and sent once the peer reconnects.
//
// NOTE: This must be run as a goroutine.
func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) {
defer s.wg.Done()
// We'll start by requesting a notification for when the peer
// reconnects.
peerChan := make(chan lnpeer.Peer, 1)
waitUntilOnline:
log.Debugf("Requesting online notification for peer=%x", peerPubKey)
s.cfg.NotifyWhenOnline(peerPubKey, peerChan)
var peer lnpeer.Peer
out:
for {
select {
// While we're waiting, we'll also consume any messages that
// must be sent to prevent blocking the caller. These can be
// ignored for now since the peer is currently offline. Once
// they reconnect, the messages will be sent since they should
// have been persisted to disk.
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
log.Debugf("Received request to send %v message for "+
"channel=%v while peer=%x is offline",
msg.MsgType(), shortChanID, peerPubKey)
case peer = <-peerChan:
break out
case <-s.quit:
return
}
}
log.Debugf("Peer=%x is now online, proceeding to send pending messages",
peerPubKey)
// Once we detect the peer has reconnected, we'll also request a
// notification for when they disconnect. We'll use this to make sure
// they haven't disconnected (in the case of a flappy peer, etc.) by the
// time we attempt to send them the pending messages.
log.Debugf("Requesting offline notification for peer=%x", peerPubKey)
offlineChan := s.cfg.NotifyWhenOffline(peerPubKey)
pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
// With the peer online, we can now proceed to send our pending messages
// for them.
for _, msg := range pendingMsgs {
// Retrieve the short channel ID for which this message applies
// for logging purposes. The error can be ignored as the store
// can only contain messages which have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
// Ensure the peer is still online right before sending the
// message.
select {
case <-offlineChan:
goto waitUntilOnline
default:
}
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for channel=%v "+
"to %x: %v", msg.MsgType(), shortChanID,
peerPubKey, err)
goto waitUntilOnline
}
log.Debugf("Successfully sent %v message for channel=%v with "+
"peer=%x upon reconnection", msg.MsgType(), shortChanID,
peerPubKey)
// Now that the message has at least been sent once, we can
// check whether it's stale. This guarantees that
// AnnounceSignatures are sent at least once if we happen to
// already have signatures for both parties.
if s.cfg.IsMsgStale(msg) {
err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
if err != nil {
log.Errorf("Unable to remove stale %v message "+
"for channel=%v with peer %x: %v",
msg.MsgType(), shortChanID, peerPubKey,
err)
continue
}
log.Debugf("Removed stale %v message for channel=%v "+
"with peer=%x", msg.MsgType(), shortChanID,
peerPubKey)
}
}
// If all of our messages were stale, then there's no need for this
// handler to continue running, so we can exit now.
pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey)
if err != nil {
log.Errorf("Unable to retrieve pending messages for peer %x: %v",
peerPubKey, err)
return
}
if len(pendingMsgs) == 0 {
log.Debugf("No pending messages left for peer=%x", peerPubKey)
s.activePeersMtx.Lock()
delete(s.activePeers, peerPubKey)
s.activePeersMtx.Unlock()
close(peerMgr.done)
return
}
// Once the pending messages are sent, we can continue to send any
// future messages while the peer remains connected.
for {
select {
case msg := <-peerMgr.msgs:
// Retrieve the short channel ID for which this message
// applies for logging purposes. The error can be
// ignored as the store can only contain messages which
// have a ShortChannelID field.
shortChanID, _ := msgShortChanID(msg)
if err := peer.SendMessage(false, msg); err != nil {
log.Errorf("Unable to send %v message for "+
"channel=%v to %x: %v", msg.MsgType(),
shortChanID, peerPubKey, err)
}
log.Debugf("Successfully sent %v message for "+
"channel=%v with peer=%x", msg.MsgType(),
shortChanID, peerPubKey)
case <-offlineChan:
goto waitUntilOnline
case <-s.quit:
return
}
}
}
// resendPendingMsgs retrieves and sends all of the messages within the message
// store that should be reliably sent to their respective peers.
func (s *reliableSender) resendPendingMsgs() error {
// Fetch all of the peers for which we have pending messages for and
// spawn a peerMsgHandler for each. Once the peer is seen as online, all
// of the pending messages will be sent.
peers, err := s.cfg.MessageStore.Peers()
if err != nil {
return err
}
for peer := range peers {
s.spawnPeerHandler(peer)
}
return nil
}