-
Notifications
You must be signed in to change notification settings - Fork 4
/
conn.go
253 lines (230 loc) · 7.23 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package vara
import (
"errors"
"fmt"
"io"
"net"
"sync"
"time"
)
// Wrapper for the data port connection we hand to clients. Implements net.Conn.
type conn struct {
*Modem
remoteCall string
lastWrite time.Time
closeOnce sync.Once
closing bool
}
func (m *Modem) newConn(remoteCall string) *conn {
m.dataConn.SetDeadline(time.Time{}) // Reset any previous deadlines
return &conn{
Modem: m,
remoteCall: remoteCall,
}
}
// Flush blocks until the modem's TX buffer is empty.
func (v *conn) Flush() error {
debugPrint("Flushing...")
defer debugPrint("Flushed")
cmds, cancel := v.cmds.Subscribe("DISCONNECTED", "BUFFER")
defer cancel()
if v.closing {
return nil
}
timeout := time.NewTimer(time.Minute)
defer timeout.Stop()
count := v.bufferCount.get()
for count > 0 {
select {
case cmd, ok := <-cmds:
switch {
case !ok:
return ErrModemClosed
case cmd == "DISCONNECTED":
return io.EOF
default:
if !timeout.Stop() {
<-timeout.C
}
timeout.Reset(time.Minute)
count = parseBuffer(cmd)
}
case <-timeout.C:
return errors.New("flush: buffer timeout")
}
}
return nil
}
// SetDeadline sets the read and write deadlines associated with the connection.
func (v *conn) SetDeadline(t time.Time) error { return v.dataConn.SetDeadline(t) }
// SetWriteDeadline sets the write deadline associated with the connection.
func (v *conn) SetWriteDeadline(t time.Time) error { return v.dataConn.SetWriteDeadline(t) }
// SetReadDeadline sets the read deadline associated with the connection.
func (v *conn) SetReadDeadline(t time.Time) error { return v.dataConn.SetReadDeadline(t) }
// LocalAddr returns the local network address.
func (v *conn) LocalAddr() net.Addr { return Addr{v.myCall} }
// RemoteAddr returns the remote network address.
func (v *conn) RemoteAddr() net.Addr { return Addr{v.remoteCall} }
// Close closes the connection.
//
// Any blocked Read or Write operations will be unblocked and return errors.
func (v *conn) Close() error {
var err error
v.closeOnce.Do(func() {
debugPrint("Closing connection...")
if v.Modem.closed {
err = ErrModemClosed
return
}
defer func() {
// Discard any remaining data
v.dataConn.SetReadDeadline(time.Now().Add(time.Second))
n, _ := io.Copy(io.Discard, v.dataConn)
debugPrint("close: discarded %d bytes of remaining data", n)
}()
v.closing = true
connectChange, cancel := v.cmds.Subscribe("DISCONNECTED")
defer cancel()
if v.connectedState == disconnected {
// Connection is already closed.
return
}
// Workaround for race condition between write and close
// (since cmd and data are not synchronized being on separate TCP sockets):
// VARA promise that DISCONNECT will flush the TX buffer before closing the connection, but we
// need to make sure the last data written have reached the modem before calling DISCONNECT.
if dur := time.Since(v.lastWrite); dur < 2*time.Second {
<-time.After(2*time.Second - dur)
}
v.writeCmd("DISCONNECT")
select {
case _, ok := <-connectChange:
if !ok {
err = ErrModemClosed
return
}
// This is the happy path. Connection was gracefully closed.
err = nil
return
case <-time.After(60 * time.Second):
debugPrint("disconnect timeout - aborting connection")
v.Abort()
err = fmt.Errorf("disconnect timeout - connection aborted")
return
}
})
return err
}
func (v *conn) Read(b []byte) (n int, err error) {
connectChange, cancel := v.cmds.Subscribe("DISCONNECTED")
defer cancel()
if v.connectedState != connected {
debugPrint("read: not connected")
return 0, io.EOF
}
type res struct {
n int
err error
}
ready := make(chan res, 1)
go func() {
defer close(ready)
v.dataConn.SetReadDeadline(time.Time{}) // Disable read deadline
n, err = v.dataConn.Read(b)
if err != nil {
debugPrint("read error: %v", err)
}
ready <- res{n, err}
}()
select {
case res := <-ready:
// We got data. Return it :)
return res.n, res.err
case _, ok := <-connectChange:
debugPrint("read: disconnected while reading")
if !ok {
return 0, ErrModemClosed
}
// Workaround for race condition between cmd and data conn.
// The data was of course sent before the DISCONNECT, but they are received
// out of order since they're sent from the modem on independent streams.
select {
case res := <-ready:
debugPrint("read: got data (%d bytes) after disconnect (err: %v)", res.n, res.err)
if res.err != nil {
return res.n, io.EOF
}
return res.n, nil
case <-time.After(2 * time.Second):
debugPrint("read: timeout waiting for data after disconnect")
// Set a read deadline to ensure the above Read call is cancelled after we return.
v.dataConn.SetReadDeadline(time.Now())
return 0, io.EOF
}
}
}
func (v *conn) Write(b []byte) (int, error) {
cmds, cancel := v.cmds.Subscribe("DISCONNECTED", "BUFFER")
defer cancel()
if v.connectedState != connected {
return 0, io.EOF
}
// Throttle to match the transmitted data rate by blocking if the tx buffer size is getting much bigger
// than the payloads being sent.
//
// Yes, a magic number. We don't know the actual on-air packet length and/or max outstanding frames of
// the mode in use. We also don't know how often the modem sends BUFFER updates. If the number is too
// small, we end up causing unnecessary IDLE time. Too large and we end up with non-blocking writes and
// a very large TX buffer causing Close() to block for a very long time. This magic number seem to work
// well enough for both VARA FM and VARA HF.
const magicNumber = 7
bufferTimeout := time.NewTimer(time.Minute)
defer bufferTimeout.Stop()
bufferCount := v.bufferCount.get()
for bufferCount >= magicNumber*len(b) && !v.closing {
debugPrint("write: buffer full (%d >= %d)", bufferCount, magicNumber*len(b))
select {
case cmd, ok := <-cmds:
switch {
case !ok:
return 0, ErrModemClosed
case cmd == "DISCONNECTED":
debugPrint("write: state changed while waiting for buffer space")
return 0, io.EOF
default:
bufferCount = parseBuffer(cmd)
if !bufferTimeout.Stop() {
<-bufferTimeout.C
}
bufferTimeout.Reset(time.Minute)
}
case <-bufferTimeout.C:
// This is most likely due to a app<->tnc bug, but might also be due
// to stalled connection.
return 0, fmt.Errorf("write: buffer timeout")
}
}
// VARA keeps accepting data after a DISCONNECT command has been sent, adding it to the TX buffer queue.
// Since VARA keeps the connection open until the TX buffer is empty, we need to make sure we don't
// keep feeding the buffer after we've sent the DISCONNECT command.
// To do this, we block until the disconnect is complete.
if v.closing && v.connectedState == connected {
debugPrint("write: waiting for disconnect to complete...")
for cmd := range cmds {
if cmd != "DISCONNECTED" {
continue
}
break
}
debugPrint("write: disconnect complete")
return 0, io.EOF
}
// Modem is ready to receive more data :-)
debugPrint("write: sending %d bytes", len(b))
v.bufferCount.incr(len(b))
v.lastWrite = time.Now()
return v.dataConn.Write(b)
}
// TxBufferLen implements the transport.TxBuffer interface.
// It returns the current number of bytes in the TX buffer queue or in transit to the modem.
func (v *conn) TxBufferLen() int { return v.bufferCount.get() }