-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
server.go
436 lines (358 loc) · 11.3 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
package wtserver
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtwire"
)
var (
// ErrPeerAlreadyConnected signals that a peer with the same session id
// is already active within the server.
ErrPeerAlreadyConnected = errors.New("peer already connected")
// ErrServerExiting signals that a request could not be processed
// because the server has been requested to shut down.
ErrServerExiting = errors.New("server shutting down")
)
// Config abstracts the primary components and dependencies of the server.
type Config struct {
// DB provides persistent access to the server's sessions and for
// storing state updates.
DB DB
// NodeKeyECDH is the the ECDH capable wrapper of the key to be used in
// accepting new brontide connections.
NodeKeyECDH keychain.SingleKeyECDH
// Listeners specifies which address to which clients may connect.
Listeners []net.Listener
// ReadTimeout specifies how long a client may go without sending a
// message.
ReadTimeout time.Duration
// WriteTimeout specifies how long a client may go without reading a
// message from the other end, if the connection has stopped buffering
// the server's replies.
WriteTimeout time.Duration
// NewAddress is used to generate reward addresses, where a cut of
// successfully sent funds can be received.
NewAddress func() (btcutil.Address, error)
// ChainHash identifies the network that the server is watching.
ChainHash chainhash.Hash
// NoAckCreateSession causes the server to not reply to create session
// requests, this should only be used for testing.
NoAckCreateSession bool
// NoAckUpdates causes the server to not acknowledge state updates, this
// should only be used for testing.
NoAckUpdates bool
// DisableReward causes the server to reject any session creation
// attempts that request rewards.
DisableReward bool
}
// Server houses the state required to handle watchtower peers. It's primary job
// is to accept incoming connections, and dispatch processing of the client
// message streams.
type Server struct {
started sync.Once
stopped sync.Once
cfg *Config
connMgr *connmgr.ConnManager
clientMtx sync.RWMutex
clients map[wtdb.SessionID]Peer
newPeers chan Peer
localInit *wtwire.Init
wg sync.WaitGroup
quit chan struct{}
}
// New creates a new server to handle watchtower clients. The server will accept
// clients connecting to the listener addresses, and allows them to open
// sessions and send state updates.
func New(cfg *Config) (*Server, error) {
localInit := wtwire.NewInitMessage(
lnwire.NewRawFeatureVector(
wtwire.AltruistSessionsOptional,
wtwire.AnchorCommitOptional,
),
cfg.ChainHash,
)
s := &Server{
cfg: cfg,
clients: make(map[wtdb.SessionID]Peer),
newPeers: make(chan Peer),
localInit: localInit,
quit: make(chan struct{}),
}
connMgr, err := connmgr.New(&connmgr.Config{
Listeners: cfg.Listeners,
OnAccept: s.inboundPeerConnected,
Dial: noDial,
})
if err != nil {
return nil, err
}
s.connMgr = connMgr
return s, nil
}
// Start begins listening on the server's listeners.
func (s *Server) Start() error {
s.started.Do(func() {
log.Infof("Starting watchtower server")
s.wg.Add(1)
go s.peerHandler()
s.connMgr.Start()
log.Infof("Watchtower server started successfully")
})
return nil
}
// Stop shutdowns down the server's listeners and any active requests.
func (s *Server) Stop() error {
s.stopped.Do(func() {
log.Infof("Stopping watchtower server")
s.connMgr.Stop()
close(s.quit)
s.wg.Wait()
log.Infof("Watchtower server stopped successfully")
})
return nil
}
// inboundPeerConnected is the callback given to the connection manager, and is
// called each time a new connection is made to the watchtower. This method
// proxies the new peers by filtering out those that do not satisfy the
// server.Peer interface, and closes their connection. Successful connections
// will be passed on to the public InboundPeerConnected method.
func (s *Server) inboundPeerConnected(c net.Conn) {
peer, ok := c.(Peer)
if !ok {
log.Warnf("incoming connection %T does not satisfy "+
"server.Peer interface", c)
c.Close()
return
}
s.InboundPeerConnected(peer)
}
// InboundPeerConnected accepts a server.Peer, and handles the request submitted
// by the client. This method serves also as a public endpoint for locally
// registering new clients with the server.
func (s *Server) InboundPeerConnected(peer Peer) {
select {
case s.newPeers <- peer:
case <-s.quit:
}
}
// peerHandler processes newly accepted peers and spawns a client handler for
// each. The peerHandler is used to ensure that waitgrouped client handlers are
// spawned from a waitgrouped goroutine.
func (s *Server) peerHandler() {
defer s.wg.Done()
defer s.removeAllPeers()
for {
select {
case peer := <-s.newPeers:
s.wg.Add(1)
go s.handleClient(peer)
case <-s.quit:
return
}
}
}
// handleClient processes a series watchtower messages sent by a client. The
// client may either send:
// * a single CreateSession message.
// * a series of StateUpdate messages.
//
// This method uses the server's peer map to ensure at most one peer using the
// same session id can enter the main event loop. The connection will be
// dropped by the watchtower if no messages are sent or received by the
// configured Read/WriteTimeouts.
//
// NOTE: This method MUST be run as a goroutine.
func (s *Server) handleClient(peer Peer) {
defer s.wg.Done()
// Use the connection's remote pubkey as the client's session id.
id := wtdb.NewSessionIDFromPubKey(peer.RemotePub())
// Register this peer in the server's client map, and defer the
// connection's cleanup. If the peer already exists, we will close the
// connection and exit immediately.
err := s.addPeer(&id, peer)
if err != nil {
peer.Close()
return
}
defer s.removePeer(&id, peer.RemoteAddr())
msg, err := s.readMessage(peer)
if err != nil {
log.Errorf("Unable to read message from client %s@%s: %v",
id, peer.RemoteAddr(), err)
return
}
remoteInit, ok := msg.(*wtwire.Init)
if !ok {
log.Errorf("Client %s@%s did not send Init msg as first "+
"message", id, peer.RemoteAddr())
return
}
err = s.sendMessage(peer, s.localInit)
if err != nil {
log.Errorf("Unable to send Init msg to %s: %v", id, err)
return
}
err = s.localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
if err != nil {
log.Errorf("Cannot support client %s: %v", id, err)
return
}
nextMsg, err := s.readMessage(peer)
if err != nil {
log.Errorf("Unable to read watchtower msg from %s: %v",
id, err)
return
}
switch msg := nextMsg.(type) {
case *wtwire.CreateSession:
// Attempt to open a new session for this client.
err = s.handleCreateSession(peer, &id, msg)
if err != nil {
log.Errorf("Unable to handle CreateSession "+
"from %s: %v", id, err)
}
case *wtwire.DeleteSession:
err = s.handleDeleteSession(peer, &id)
if err != nil {
log.Errorf("Unable to handle DeleteSession "+
"from %s: %v", id, err)
}
case *wtwire.StateUpdate:
err = s.handleStateUpdates(peer, &id, msg)
if err != nil {
log.Errorf("Unable to handle StateUpdate "+
"from %s: %v", id, err)
}
default:
log.Errorf("Received unsupported message type: %T "+
"from %s", nextMsg, id)
}
}
// connFailure is a default error used when a request failed with a non-zero
// error code.
type connFailure struct {
ID wtdb.SessionID
Code wtwire.ErrorCode
}
// Error displays the SessionID and Code that caused the connection failure.
func (f *connFailure) Error() string {
return fmt.Sprintf("connection with %s failed with code=%s",
f.ID, f.Code,
)
}
// readMessage receives and parses the next message from the given Peer. An
// error is returned if a message is not received before the server's read
// timeout, the read off the wire failed, or the message could not be
// deserialized.
func (s *Server) readMessage(peer Peer) (wtwire.Message, error) {
// Set a read timeout to ensure we drop the client if not sent in a
// timely manner.
err := peer.SetReadDeadline(time.Now().Add(s.cfg.ReadTimeout))
if err != nil {
err = fmt.Errorf("unable to set read deadline: %v", err)
return nil, err
}
// Pull the next message off the wire, and parse it according to the
// watchtower wire specification.
rawMsg, err := peer.ReadNextMessage()
if err != nil {
err = fmt.Errorf("unable to read message: %v", err)
return nil, err
}
msgReader := bytes.NewReader(rawMsg)
msg, err := wtwire.ReadMessage(msgReader, 0)
if err != nil {
err = fmt.Errorf("unable to parse message: %v", err)
return nil, err
}
logMessage(peer, msg, true)
return msg, nil
}
// sendMessage sends a watchtower wire message to the target peer.
func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error {
// TODO(conner): use buffer pool?
var b bytes.Buffer
_, err := wtwire.WriteMessage(&b, msg, 0)
if err != nil {
err = fmt.Errorf("unable to encode msg: %v", err)
return err
}
err = peer.SetWriteDeadline(time.Now().Add(s.cfg.WriteTimeout))
if err != nil {
err = fmt.Errorf("unable to set write deadline: %v", err)
return err
}
logMessage(peer, msg, false)
_, err = peer.Write(b.Bytes())
return err
}
// addPeer stores a client in the server's client map. An error is returned if a
// client with the same session id already exists.
func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if existingPeer, ok := s.clients[*id]; ok {
log.Infof("Already connected to peer %s@%s, disconnecting %s",
id, existingPeer.RemoteAddr(), peer.RemoteAddr())
return ErrPeerAlreadyConnected
}
s.clients[*id] = peer
log.Infof("Accepted incoming peer %s@%s",
id, peer.RemoteAddr())
return nil
}
// removePeer deletes a client from the server's client map. If a peer is found,
// this method will close the peer's connection.
func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) {
log.Infof("Releasing incoming peer %s@%s", id, addr)
s.clientMtx.Lock()
peer, ok := s.clients[*id]
delete(s.clients, *id)
s.clientMtx.Unlock()
if ok {
peer.Close()
}
}
// removeAllPeers iterates through the server's current set of peers and closes
// all open connections.
func (s *Server) removeAllPeers() {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
for id, peer := range s.clients {
log.Infof("Releasing incoming peer %s@%s", id,
peer.RemoteAddr())
delete(s.clients, id)
peer.Close()
}
}
// logMessage writes information about a message exchanged with a remote peer,
// using directional prepositions to signal whether the message was sent or
// received.
func logMessage(peer Peer, msg wtwire.Message, read bool) {
var action = "Received"
var preposition = "from"
if !read {
action = "Sending"
preposition = "to"
}
summary := wtwire.MessageSummary(msg)
if len(summary) > 0 {
summary = "(" + summary + ")"
}
log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
preposition, peer.RemotePub().SerializeCompressed(),
peer.RemoteAddr())
}
// noDial is a dummy dial method passed to the server's connmgr.
func noDial(_ net.Addr) (net.Conn, error) {
return nil, fmt.Errorf("watchtower cannot make outgoing conns")
}