/
socket.go
525 lines (429 loc) · 16.3 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
package p2p
import (
"bufio"
"context"
"io"
"math"
"net"
"os"
"sync"
"time"
"github.com/pokt-network/pocket/p2p/types"
sharedTypes "github.com/pokt-network/pocket/shared/types"
"github.com/pokt-network/pocket/shared/utils"
"go.uber.org/atomic"
)
type SocketEventMonitor func(context.Context, *socket) error
type RWBUffer struct {
// the read buffer is a byte slice of a certain size (configurable: `ReadBufferSize``) which is destined
// to receive incoming data. This buffer is not concurrent and is primarily used by the s.read routine
read *types.Buffer
// the write buffer is a byte slice of a certain size (configurable: `WriteBufferSize``) which is written to
// by the owner of this socket (i.e: the runner, the peer). When the peer is done writing to the buffer, the s.write routine
// proceeds to writing this buffer to the concerned connection, resulting in a network send.
// This buffer is concurrent because two operations are happening in parallel, the writing to the buffer by the peer
// and the reading off of the buffer by the s.write routine
write *types.ConcurrentBuffer
}
// A "socket" (not to be confused with the OS' socket) is an abstraction around the net.Conn go interface,
// whose purpose is to represent a p2p connection with full "read/write" capabilities.
//
// Both read and write operations are buffered, and both buffer sizes are configurable.
//
// Configuration paramters are directly assigned to the socket struct.
//
// 1 live p2p connection = 1 socket
type socket struct {
// the agent responsible for running/creating/managing sockets
runner types.Runner
// configuration parameters
headerLength uint
bufferSize uint
readTimeout uint
addr string
kind types.SocketType // inbound or outbound
// the actual network socket
conn net.Conn
// the wire codec
codec *wireCodec
// io buffers
buffers *RWBUffer
// the io reader/writer
reader *bufio.Reader
writer *bufio.Writer
// the map to track writes that expects acknowledgements
// we call them requests (as they require responses)
requests *types.RequestMap
// turns true when the socket is opened (i.e., the connection is established and IO routines are launched)
isOpen atomic.Bool
isWriting atomic.Bool
isReading atomic.Bool
// For reference, see these resources on the use of empty structs in go channels:
// - https://dave.cheney.net/2014/03/25/the-empty-struct
// - https://dave.cheney.net/2013/04/30/curious-channels
ready chan struct{} // when the socket is opened and IO starts, this channel gets closed to signal readiness
done chan struct{} // if this channel is closed or receives and input, it stops the socket and IO operations
writing chan struct{} // when the writing starts, this channel receives a new input; closes when done writing (i.e: stopped the socket)
reading chan struct{} // when the reading starts, this channel receives a new input; closes when done reading (i.e: stopped the socket)
errored chan struct{} // on error, this channel receives a new input to signal the happening of an error
ioStarted chan struct{} // on start of IO, this channel closes to signal out that the IO has kicked off
err struct { // the reference to store the encountered error
sync.Mutex
error
}
logger types.Logger
}
// A constructor to create a socket
func NewSocket(readBufferSize, packetHeaderLength, readTimeoutInMs uint) *socket {
pipe := &socket{
codec: newWireCodec(),
kind: types.UndefinedSocketType,
headerLength: packetHeaderLength,
bufferSize: readBufferSize,
readTimeout: readTimeoutInMs,
buffers: &RWBUffer{
read: types.NewBuffer(readBufferSize),
write: types.NewConcurrentBuffer(0),
},
requests: types.NewRequestMap(math.MaxUint32),
ready: make(chan struct{}), // closes to signal the readiness of the socket
done: make(chan struct{}), // closes to signal the closing of the socket
writing: make(chan struct{}), // sends a new input to signal the start of the writing routine, closes when done writing
reading: make(chan struct{}), // sends new input to signal the start of the reading routine, closes when done reading
ioStarted: make(chan struct{}), // closes to signal the start of IO
// sends new input to signal the encoutering of an error in running routines
// closes when the socket closes.
// Bufferred to 1 to allow non-blocking signaling of types. and blocking awaiting of error signals.
// (We are handling 1 error at most, so not more than one signal is expected to be received at a time, establishing a queue of exactly 1 error...)
errored: make(chan struct{}, 1),
logger: types.NewLogger(os.Stdout),
}
return pipe
}
// Retrieves the underlying TCP socket (net.Conn) in question through the connector argument and starts
// the IO operations on that socket, while also putting in place event handlers for onSocketOpened
// and onSocketClosed events.
func (s *socket) open(ctx context.Context, connector func() (string, types.SocketType, net.Conn), onOpened SocketEventMonitor, onClosed SocketEventMonitor) error {
s.buffers.write.Open()
addr, socketType, conn := connector()
if utils.IsEmpty(addr) {
return sharedTypes.ErrMissingRequiredArg("address")
}
if utils.IsEmpty(string(socketType)) {
return sharedTypes.ErrMissingRequiredArg("socketType")
}
switch socketType {
case types.Outbound:
case types.Inbound:
default:
s.close()
return sharedTypes.ErrUndefinedSocketType(string(socketType))
}
go s.startIO(ctx, socketType, addr, conn, onOpened, onClosed)
select {
case _, open := <-s.ioStarted: // wait for the IO to start, closes on failure, signals on success
if !open {
return sharedTypes.ErrSocketIOStartFailed(string(socketType))
}
case <-s.errored:
return s.err.error
}
s.signalOpen()
s.signalReady()
return nil
}
// Stops the ongoing IO operations gracefully (i.e., s.read and s.write routines) and closes the
// underlying network connection (s.conn), as well as all opened channels. Finally, close() signals
// that the closing process has went successfully by sending a signal on s.closed.
// NOTE: This method does not block.
func (s *socket) close() {
if !s.isOpen.Load() {
return
}
close(s.done)
s.stopErrorReporting()
s.buffers.write.Close()
if s.conn != nil {
s.conn.Close()
}
s.signalClose()
}
// Creates a reader and writer for the network connection and kicks off the onSocketOpened event handler.
// This also launches both the reading and writing routines (read, write methods), both of which are blocking.
// When the write routine exists, the onSocketClosed event handler kicks off and returns an error if there is one.
// NOTE: The write and read routines will both exit for the same reasons, so having just one of them block is sufficient.
func (s *socket) startIO(ctx context.Context, kind types.SocketType, addr string, conn net.Conn, onOpened SocketEventMonitor, onClosed SocketEventMonitor) {
defer s.close()
s.addr = addr
s.kind = kind
s.conn = conn
s.reader = bufio.NewReader(conn)
s.writer = bufio.NewWriter(conn)
if err := onOpened(ctx, s); err != nil {
s.error(err)
close(s.writing)
close(s.reading)
return
}
go s.read(context.Context(ctx)) // closes s.reading when done
go s.write(context.Context(ctx)) // closes s.writing when done
_, open := <-s.writing
if !open {
s.logger.Error("Socket stopped writing after starting...")
s.signalIoFailure()
return
}
_, open = <-s.reading
if !open {
s.logger.Error("Socket stopped reading after starting...")
s.signalIoFailure()
return
}
s.signalIoStarted()
s.logger.Info("Running...")
select {
case <-s.runner.Done(): // if the socket runner is done, the socket moves on to run onClosed
if !s.isOpen.Load() {
s.close()
}
case <-ctx.Done(): // if the context is done, the socket moves on to run onClosed
if !s.isOpen.Load() {
s.close()
}
case <-s.done: // if the socket is done, the socket moves on to run onClosed
}
s.logger.Info("Closing the socket")
if err := onClosed(ctx, s); err != nil {
s.error(err)
return
}
s.logger.Info("Closed")
}
// The TLS handshake algorithm to establish encrypted connections
func (s *socket) handshake() {
panic("Not implemented")
}
// Reads a chunk (of size `readbufferSize`) out of the TCP connection using `s.reader`.
//
// This is used by the read routine (`s.read`) to perform buffered reads.
// To achieve buffered reading, `readChunk` first off reads the header bytes (first bytes from 0 to `headerLength``)
// to retrieve size of received/to-be-read payload. If the payload size exceeds the configured max,
// `readChunk` will error out, the other end will receive a `ErrPayloadTooBig` error.
// TODO(derrandz): implement erroring logic to send this error.
//
// NOTE: After reading the header, readChunk blocks until the full body length is read.
func (s *socket) readChunk() ([]byte, int, error) {
var n int
readBuffer := s.buffers.read.Ref()
if _, err := io.ReadFull(s.reader, (*readBuffer)[:s.headerLength]); err != nil {
return nil, 0, err
}
_, _, bodyLen, err := s.codec.decodeHeader((*readBuffer)[:s.headerLength])
if err != nil {
return nil, 0, err
}
// TODO(derrandz): replace with configurable max value or keep it as is (i.e: max=chunk size) ??
if bodyLen > uint32(s.bufferSize-s.headerLength) {
return nil, 0, sharedTypes.ErrPayloadTooBig(uint(bodyLen), s.bufferSize-s.headerLength)
}
if n, err = io.ReadFull(s.reader, (*readBuffer)[s.headerLength:uint32(s.headerLength)+bodyLen]); err != nil {
return nil, 0, err
}
buff := make([]byte, 0)
buff = append(buff, (*readBuffer)[:s.headerLength+uint(n)]...)
return buff, n, err
}
// The read routine.
//
// This routine performs buffered reads (the size of the buffer is the config param: `readBufferSize`) on the
// established connection and does two things as a consequence of a read operation:
// 1. If it's a response to a request out of this socket, it will redirect the response to the request's response channel (see `types/request.go`)
// 2. If not, it will handover the read (past-tense participle) buffer as a Packet (check `types/packet.go`) to the runner (check `types/runner.go`)
//
// This routine halts if:
// - The cancelable context cancels
// - If the runner stops (i.e: s.runner.Done() receives)
// - If the socket closes
// - If some party signals that this routine should stop (by closing the s.reading channel)
// - If there is an IO error: EOF, UnexpectedEOF, peer hang up, unexpected error
func (s *socket) read(ctx context.Context) {
defer s.signalReadingStop()
s.signalReadingStart()
reader:
for {
select {
case <-ctx.Done(): // stop if context cancels
break reader
case <-s.runner.Done(): // stop if runner stops
break reader
case <-s.done: // stop if the socket closes
break reader
default:
{
buf, n, err := s.readChunk()
if err != nil {
switch err {
case io.EOF:
s.error(sharedTypes.ErrPeerHangUp(err))
break reader
case io.ErrUnexpectedEOF:
s.error(sharedTypes.ErrPeerHangUp(err))
break reader
default:
s.error(sharedTypes.ErrUnexpected(err))
break reader
}
}
if n == 0 {
s.logger.Warn("Read 0 bytes on socket:", s.addr)
continue
}
nonce, _, data, wrapped, err := s.codec.decode(buf)
if err != nil {
s.error(err)
break reader
}
// A non-zero nonce happens on nonced-respones (i.e., responses to already sent requests).
// Using the non-zero nonce, we are able to fetch the existing (waiting) request from
// the request map and pull out the channel on which this request expects to receive a response.
if nonce != 0 {
_, ch, found := s.requests.Find(nonce)
if !found {
s.logger.Warn("Received response with nonce but no request found:", nonce)
}
ch <- types.NewPacket(nonce, data, s.addr, wrapped)
close(ch)
continue
}
// TODO(derrandz): should we make this in a separate routine to avoid any potential issues?
s.runner.Sink() <- types.NewPacket(nonce, data, s.addr, wrapped)
}
}
}
}
// TODO(derrandz): Add buffered write by writing exactly `WriteBufferSize` amount of bytes, and splitting if larger.
// Will need to think about how to split big payloads and sequence them.
//
// Writes a chunk=writeBufferSize to the writer (s.writer).
// This operation is blocking, and blocks until a waiter is ready to receive the signal from the write buffer (signaling that is has written).
// This is used by send/request/broadcast operations.
// Upon each send, the write routine will receive a signal so that it may proceed to send the write over the network.
func (s *socket) writeChunk(b []byte, isErrorOf bool, reqNum uint32, wrapped bool) (uint, error) {
defer s.buffers.write.Unlock()
s.buffers.write.Lock()
writeBuffer := s.buffers.write.Ref()
buff := s.codec.encode(Binary, isErrorOf, reqNum, b, wrapped)
*writeBuffer = append(*writeBuffer, buff...)
s.buffers.write.Signal()
return uint(len(b)), nil // TODO(derrandz): should length be of b or of the encoded b
}
// writeChunkAckful is a writeChunk that expects to receive an ACK response for the chunk it has written.
// This method will create a request, which is basically a nonce and a channel, the nonce to identify the
// written chunk and the channel to receive the response for that particular written chunk.
//
// The channel - on which the response is expected to be received - is blocking, thus enables the 'wait to receive the response' behavior.
// The `read` routine takes care of identifying incoming responses (_using the nonce_) and redirecting them to the waiting channels of the currently-open requests.
func (s *socket) writeChunkAckful(b []byte, wrapped bool) (types.Packet, error) {
request := s.requests.Get()
requestNonce := request.Nonce
if _, err := s.writeChunk(b, false, requestNonce, wrapped); err != nil {
s.requests.Delete(requestNonce)
return types.NewPacket(requestNonce, nil, "", false), err
}
var response types.Packet
select {
case response = <-request.ResponsesCh:
return response, nil
case <-time.After(time.Millisecond * time.Duration(s.readTimeout)):
close(request.ResponsesCh)
return types.Packet{}, sharedTypes.ErrSocketRequestTimedOut(s.addr, requestNonce)
}
}
// The write routine.
//
// This routine performs buffered writes (`writeBufferSize``) on the established connection.
// This routine halts if:
// - the cancelable routine cancels
// - if the runner stops
// - if the socket closes
// - if some party signals that this routine should stop (by closing the s.writing channel)
// - if there is a write error
func (s *socket) write(ctx context.Context) {
defer s.signalWritingStop()
s.signalWritingStart()
writer:
for {
select {
case <-ctx.Done(): // stop if context cancels
break writer
case <-s.runner.Done(): // stop if runner stops
break writer
case <-s.done: // stop if the socket closes
break writer
case <-s.buffers.write.Signals(): // blocks
{
if !s.buffers.write.IsOpen() {
break writer
}
buff := s.buffers.write.DumpBytes()
if _, err := s.writer.Write(buff); err != nil {
s.error(err)
break writer
}
if err := s.writer.Flush(); err != nil {
s.error(err)
break writer
}
}
}
}
}
// Tracks and stores the encountered error
func (s *socket) error(err error) {
defer s.err.Unlock()
s.err.Lock()
if s.err.error == nil {
s.err.error = err
}
s.errored <- struct{}{}
}
// signal the readiness of the socket, called when everything has been perofrmed successfully when opening the socket
func (s *socket) signalReady() {
close(s.ready)
}
// signal that the socket has been opened. Flip the flag.
func (s *socket) signalOpen() {
s.isOpen.Store(true)
}
// signal that the socket has been closed.
func (s *socket) signalClose() {
s.isOpen.Store(false)
}
func (s *socket) stopErrorReporting() {
defer s.err.Unlock()
s.err.Lock()
close(s.errored)
}
func (s *socket) signalWritingStart() {
s.isWriting.Store(true)
s.writing <- struct{}{}
}
func (s *socket) signalWritingStop() {
close(s.writing)
s.isWriting.Store(false)
}
func (s *socket) signalReadingStart() {
s.isReading.Store(true)
s.reading <- struct{}{}
}
func (s *socket) signalReadingStop() {
close(s.reading)
s.isReading.Store(false)
}
// used by startIO to signal that IO has been kicked off
func (s *socket) signalIoStarted() {
s.ioStarted <- struct{}{}
}
func (s *socket) signalIoFailure() {
close(s.ioStarted)
}