-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
state_update.go
157 lines (129 loc) · 4.07 KB
/
state_update.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package wtserver
import (
"fmt"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtwire"
)
// handleStateUpdates processes a stream of StateUpdate requests from the
// client. The provided update should be the first such update read, subsequent
// updates will be consumed if the peer does not signal IsComplete on a
// particular update.
func (s *Server) handleStateUpdates(peer Peer, id *wtdb.SessionID,
update *wtwire.StateUpdate) error {
// Set the current update to the first update read off the wire.
// Additional updates will be read if this value is set to nil after
// processing the first.
var curUpdate = update
for {
// If this is not the first update, read the next state update
// from the peer.
if curUpdate == nil {
nextMsg, err := s.readMessage(peer)
if err != nil {
return err
}
var ok bool
curUpdate, ok = nextMsg.(*wtwire.StateUpdate)
if !ok {
return fmt.Errorf("client sent %T after "+
"StateUpdate", nextMsg)
}
}
// Try to accept the state update from the client.
err := s.handleStateUpdate(peer, id, curUpdate)
if err != nil {
return err
}
// If the client signals that this is last StateUpdate
// message, we can disconnect the client.
if curUpdate.IsComplete == 1 {
return nil
}
// Reset the current update to read subsequent updates in the
// stream.
curUpdate = nil
select {
case <-s.quit:
return ErrServerExiting
default:
}
}
}
// handleStateUpdate processes a StateUpdate message request from a client. An
// attempt will be made to insert the update into the db, where it is validated
// against the client's session. The possible errors are then mapped back to
// StateUpdateCodes specified by the watchtower wire protocol, and sent back
// using a StateUpdateReply message.
func (s *Server) handleStateUpdate(peer Peer, id *wtdb.SessionID,
update *wtwire.StateUpdate) error {
var (
lastApplied uint16
failCode wtwire.ErrorCode
err error
)
sessionUpdate := wtdb.SessionStateUpdate{
ID: *id,
Hint: update.Hint,
SeqNum: update.SeqNum,
LastApplied: update.LastApplied,
EncryptedBlob: update.EncryptedBlob,
}
lastApplied, err = s.cfg.DB.InsertStateUpdate(&sessionUpdate)
switch {
case err == nil:
log.Debugf("State update %d accepted for %s",
update.SeqNum, id)
failCode = wtwire.CodeOK
// Return a permanent failure if a client tries to send an update for
// which we have no session.
case err == wtdb.ErrSessionNotFound:
failCode = wtwire.CodePermanentFailure
case err == wtdb.ErrSeqNumAlreadyApplied:
failCode = wtwire.CodePermanentFailure
// TODO(conner): remove session state for protocol
// violation. Could also double as clean up method for
// session-related state.
case err == wtdb.ErrLastAppliedReversion:
failCode = wtwire.StateUpdateCodeClientBehind
case err == wtdb.ErrSessionConsumed:
failCode = wtwire.StateUpdateCodeMaxUpdatesExceeded
case err == wtdb.ErrUpdateOutOfOrder:
failCode = wtwire.StateUpdateCodeSeqNumOutOfOrder
default:
failCode = wtwire.CodeTemporaryFailure
}
if s.cfg.NoAckUpdates {
return &connFailure{
ID: *id,
Code: failCode,
}
}
return s.replyStateUpdate(
peer, id, failCode, lastApplied,
)
}
// replyStateUpdate sends a response to a StateUpdate from a client. If the
// status code in the reply is OK, the error from the write will be bubbled up.
// Otherwise, this method returns a connection error to ensure we don't continue
// communication with the client.
func (s *Server) replyStateUpdate(peer Peer, id *wtdb.SessionID,
code wtwire.StateUpdateCode, lastApplied uint16) error {
msg := &wtwire.StateUpdateReply{
Code: code,
LastApplied: lastApplied,
}
err := s.sendMessage(peer, msg)
if err != nil {
log.Errorf("unable to send StateUpdateReply to %s", id)
}
// Return the write error if the request succeeded.
if code == wtwire.CodeOK {
return err
}
// Otherwise the request failed, return a connection failure to
// disconnect the client.
return &connFailure{
ID: *id,
Code: code,
}
}