-
Notifications
You must be signed in to change notification settings - Fork 0
/
net.go
425 lines (386 loc) · 11.7 KB
/
net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package testutils
import (
"io"
"net"
"sync"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
// bufferSize is the size of the buffer used by PartitionableConn. Writes to a
// partitioned connection will block after the buffer gets filled.
const bufferSize = 16 << 10 // 16 KB
// PartitionableConn is an implementation of net.Conn that allows the
// client->server and/or the server->client directions to be temporarily
// partitioned.
//
// A PartitionableConn wraps a provided net.Conn (the serverConn member) and
// forwards every read and write to it. It interposes an arbiter in front of it
// that's used to block reads/writes while the PartitionableConn is in the
// partitioned mode.
//
// While a direction is partitioned, data sent in that direction doesn't flow. A
// write while partitioned will block after an internal buffer gets filled. Data
// written to the conn after the partition has been established is not delivered
// to the remote party until the partition is lifted. At that time, all the
// buffered data is delivered. Since data is delivered async, data written
// before the partition is established may or may not be blocked by the
// partition; use application-level ACKs if that's important.
type PartitionableConn struct {
// We embed a net.Conn so that we inherit the interface. Note that we override
// Read() and Write().
//
// This embedded Conn is half of a net.Pipe(). The other half is clientConn.
net.Conn
clientConn net.Conn
serverConn net.Conn
mu struct {
syncutil.Mutex
// err, if set, is returned by any subsequent call to Read or Write.
err error
// Are any of the two direction (client-to-server, server-to-client)
// currently partitioned?
c2sPartitioned bool
s2cPartitioned bool
c2sBuffer buf
s2cBuffer buf
// Conds to be signaled when the corresponding partition is lifted.
c2sWaiter *sync.Cond
s2cWaiter *sync.Cond
}
}
type buf struct {
// A mutex used to synchronize access to all the fields. It will be set to the
// parent PartitionableConn's mutex.
*syncutil.Mutex
data []byte
capacity int
closed bool
// The error that was passed to Close(err). See Close() for more info.
closedErr error
name string // A human-readable name, useful for debugging.
// readerWait is signaled when the reader should wake up and check the
// buffer's state: when new data is put in the buffer, when the buffer is
// closed, and whenever the PartitionableConn wants to unblock all reads (i.e.
// on partition).
readerWait *sync.Cond
// capacityWait is signaled when a blocked writer should wake up because data
// is taken out of the buffer and there's now some capacity. It's also
// signaled when the buffer is closed.
capacityWait *sync.Cond
}
func makeBuf(name string, capacity int, mu *syncutil.Mutex) buf {
b := buf{
Mutex: mu,
name: name,
capacity: capacity,
}
b.readerWait = sync.NewCond(b.Mutex)
b.capacityWait = sync.NewCond(b.Mutex)
return b
}
// Write adds data to the buffer. If there's zero free capacity, it will block
// until there's some capacity available or the buffer is closed. If there's
// non-zero insufficient capacity, it will perform a partial write.
//
// The number of bytes written is returned.
func (b *buf) Write(data []byte) (int, error) {
b.Lock()
defer b.Unlock()
for b.capacity == len(b.data) && !b.closed {
// Block for capacity.
b.capacityWait.Wait()
}
if b.closed {
return 0, b.closedErr
}
available := b.capacity - len(b.data)
toCopy := available
if len(data) < available {
toCopy = len(data)
}
b.data = append(b.data, data[:toCopy]...)
b.wakeReaderLocked()
return toCopy, nil
}
// errEAgain is returned by buf.readLocked() when the read was blocked at the
// time when buf.readerWait was signalled (in particular, after the
// PartitionableConn interrupted the read because of a partition). The caller is
// expected to try the read again after the partition is gone.
var errEAgain = errors.New("try read again")
// readLocked returns data from buf, up to "size" bytes. If there's no data in
// the buffer, it blocks until either some data becomes available or the buffer
// is closed.
func (b *buf) readLocked(size int) ([]byte, error) {
if len(b.data) == 0 && !b.closed {
b.readerWait.Wait()
// We were unblocked either by data arrving, or by a partition, or by
// another uninteresting reason. Return to the caller, in case it's because
// of a partition.
return nil, errEAgain
}
if b.closed && len(b.data) == 0 {
return nil, b.closedErr
}
var ret []byte
if len(b.data) < size {
ret = b.data
b.data = nil
} else {
ret = b.data[:size]
b.data = b.data[size:]
}
b.capacityWait.Signal()
return ret, nil
}
// Close closes the buffer. All reads and writes that are currently blocked will
// be woken and they'll all return err.
func (b *buf) Close(err error) {
b.Lock()
b.closed = true
b.closedErr = err
b.readerWait.Signal()
b.capacityWait.Signal()
b.Unlock()
}
// wakeReaderLocked wakes the reader in case it's blocked.
// See comments on readerWait.
//
// This needs to be called while holding the buffer's mutex.
func (b *buf) wakeReaderLocked() {
b.readerWait.Signal()
}
// NewPartitionableConn wraps serverConn in a PartitionableConn.
func NewPartitionableConn(serverConn net.Conn) *PartitionableConn {
clientEnd, clientConn := net.Pipe()
c := &PartitionableConn{
Conn: clientEnd,
clientConn: clientConn,
serverConn: serverConn,
}
c.mu.c2sWaiter = sync.NewCond(&c.mu.Mutex)
c.mu.s2cWaiter = sync.NewCond(&c.mu.Mutex)
c.mu.c2sBuffer = makeBuf("c2sBuf", bufferSize, &c.mu.Mutex)
c.mu.s2cBuffer = makeBuf("s2cBuf", bufferSize, &c.mu.Mutex)
// Start copying from client to server.
go func() {
err := c.copy(
c.clientConn, // src
c.serverConn, // dst
&c.mu.c2sBuffer,
func() { // waitForNoPartitionLocked
for c.mu.c2sPartitioned {
c.mu.c2sWaiter.Wait()
}
})
c.mu.Lock()
c.mu.err = err
c.mu.Unlock()
if err := c.clientConn.Close(); err != nil {
log.Errorf(context.TODO(), "unexpected error closing internal pipe: %s", err)
}
if err := c.serverConn.Close(); err != nil {
log.Errorf(context.TODO(), "error closing server conn: %s", err)
}
}()
// Start copying from server to client.
go func() {
err := c.copy(
c.serverConn, // src
c.clientConn, // dst
&c.mu.s2cBuffer,
func() { // waitForNoPartitionLocked
for c.mu.s2cPartitioned {
c.mu.s2cWaiter.Wait()
}
})
c.mu.Lock()
c.mu.err = err
c.mu.Unlock()
if err := c.clientConn.Close(); err != nil {
log.Fatalf(context.TODO(), "unexpected error closing internal pipe: %s", err)
}
if err := c.serverConn.Close(); err != nil {
log.Errorf(context.TODO(), "error closing server conn: %s", err)
}
}()
return c
}
// Finish removes any partitions that may exist so that blocked goroutines can
// finish.
// Finish() must be called if a connection may have been left in a partitioned
// state.
func (c *PartitionableConn) Finish() {
c.mu.Lock()
c.mu.c2sPartitioned = false
c.mu.c2sWaiter.Signal()
c.mu.s2cPartitioned = false
c.mu.s2cWaiter.Signal()
c.mu.Unlock()
}
// PartitionC2S partitions the client-to-server direction.
// If UnpartitionC2S() is not called, Finish() must be called.
func (c *PartitionableConn) PartitionC2S() {
c.mu.Lock()
if c.mu.c2sPartitioned {
panic("already partitioned")
}
c.mu.c2sPartitioned = true
c.mu.c2sBuffer.wakeReaderLocked()
c.mu.Unlock()
}
// UnpartitionC2S lifts an existing client-to-server partition.
func (c *PartitionableConn) UnpartitionC2S() {
c.mu.Lock()
if !c.mu.c2sPartitioned {
panic("not partitioned")
}
c.mu.c2sPartitioned = false
c.mu.c2sWaiter.Signal()
c.mu.Unlock()
}
// PartitionS2C partitions the server-to-client direction.
// If UnpartitionS2C() is not called, Finish() must be called.
func (c *PartitionableConn) PartitionS2C() {
c.mu.Lock()
if c.mu.s2cPartitioned {
panic("already partitioned")
}
c.mu.s2cPartitioned = true
c.mu.s2cBuffer.wakeReaderLocked()
c.mu.Unlock()
}
// UnpartitionS2C lifts an existing server-to-client partition.
func (c *PartitionableConn) UnpartitionS2C() {
c.mu.Lock()
if !c.mu.s2cPartitioned {
panic("not partitioned")
}
c.mu.s2cPartitioned = false
c.mu.s2cWaiter.Signal()
c.mu.Unlock()
}
// Read is part of the net.Conn interface.
func (c *PartitionableConn) Read(b []byte) (n int, err error) {
c.mu.Lock()
err = c.mu.err
c.mu.Unlock()
if err != nil {
return 0, err
}
// Forward to the embedded connection.
return c.Conn.Read(b)
}
// Write is part of the net.Conn interface.
func (c *PartitionableConn) Write(b []byte) (n int, err error) {
c.mu.Lock()
err = c.mu.err
c.mu.Unlock()
if err != nil {
return 0, err
}
// Forward to the embedded connection.
return c.Conn.Write(b)
}
// readFrom copies data from src into the buffer until src.Read() returns an
// error (e.g. io.EOF). That error is returned.
//
// readFrom is written in the spirit of interface io.ReaderFrom, except it
// returns the io.EOF error, and also doesn't guarantee that every byte that has
// been read from src is put into the buffer (as the buffer allows concurrent
// access and buf.Write can return an error).
func (b *buf) readFrom(src io.Reader) error {
data := make([]byte, 1024)
for {
nr, err := src.Read(data)
if err != nil {
return err
}
toSend := data[:nr]
for {
nw, ew := b.Write(toSend)
if ew != nil {
return ew
}
if nw == len(toSend) {
break
}
toSend = toSend[nw:]
}
}
}
// copyFromBuffer copies data from src to dst until src.Read() returns EOF.
// The EOF is returned (i.e. the return value is always != nil). This is because
// the PartitionableConn wants to hold on to any error, including EOF.
//
// waitForNoPartitionLocked is a function to be called before consuming data
// from src, in order to make sure that we only consume data when we're not
// partitioned. It needs to be called under src.Mutex, as the check needs to be
// done atomically with consuming the buffer's data.
func (c *PartitionableConn) copyFromBuffer(
src *buf, dst net.Conn, waitForNoPartitionLocked func(),
) error {
for {
// Don't read from the buffer while we're partitioned.
src.Mutex.Lock()
waitForNoPartitionLocked()
data, err := src.readLocked(1024 * 1024)
src.Mutex.Unlock()
if len(data) > 0 {
nw, ew := dst.Write(data)
if ew != nil {
err = ew
}
if len(data) != nw {
err = io.ErrShortWrite
}
} else if err == nil {
err = io.EOF
} else if err == errEAgain {
continue
}
if err != nil {
return err
}
}
}
// copy copies data from src to dst while we're not partitioned and stops doing
// so while partitioned.
//
// It runs two goroutines internally: one copying from src to an internal buffer
// and one copying from the buffer to dst. The 2nd one deals with partitions.
func (c *PartitionableConn) copy(
src net.Conn, dst net.Conn, buf *buf, waitForNoPartitionLocked func(),
) error {
tasks := make(chan error)
go func() {
err := buf.readFrom(src)
buf.Close(err)
tasks <- err
}()
go func() {
err := c.copyFromBuffer(buf, dst, waitForNoPartitionLocked)
buf.Close(err)
tasks <- err
}()
err := <-tasks
err2 := <-tasks
if err == nil {
err = err2
}
return err
}