/
memory_pipe.go
120 lines (102 loc) · 3.24 KB
/
memory_pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package memorypipe
import (
"context"
"io"
)
const bufferSize = 10
// msg represents an internal structure sent between the pipes.
type msg struct {
data []byte
err error
}
// pipe provides a bidirectional pipe compatible with io.ReadWriteCloser interface.
// Note, however, that it does not behave exactly how one would expect an io.ReadWriteCloser to
// behave. Specifically the Close() function does not close the pipe, but instead delivers an io.EOF
// error to the next reader. After which it can be read again to receive new data. This means the
// pipe can be closed multiple times. Each time it indicates that one particular session has ended.
// The reason for this is to emulate the WebsocketIO's behaviour by allowing a single persistent
// connection to be used for multiple sessions.
type pipe struct {
ch chan msg
ctx context.Context
otherEnd *pipe
msgRemainder []byte
}
// Read reads from the pipe into p. Returns number of bytes read and any errors.
func (p *pipe) Read(b []byte) (int, error) {
if p.msgRemainder != nil {
n := copy(b, p.msgRemainder)
if len(p.msgRemainder) > n {
tmpBuf := make([]byte, len(p.msgRemainder)-n)
copy(tmpBuf, p.msgRemainder[n:])
p.msgRemainder = tmpBuf
} else {
p.msgRemainder = nil
}
return n, nil
}
select {
case msg := <-p.ch:
if msg.err == io.EOF {
return 0, msg.err
}
n := copy(b, msg.data)
// Store the remainder of the message for next Read.
if len(msg.data) > n {
p.msgRemainder = make([]byte, len(msg.data)-n)
copy(p.msgRemainder, msg.data[n:])
} else {
p.msgRemainder = nil
}
return n, msg.err
case <-p.ctx.Done():
return 0, p.ctx.Err()
}
}
// Write writes to the pipe from p. Returns number of bytes written and any errors.
func (p *pipe) Write(b []byte) (int, error) {
msg := msg{
data: make([]byte, len(b)),
err: nil,
}
// Create copy of b in case it is modified externally.
copy(msg.data, b)
select {
case p.otherEnd.ch <- msg: // Sent msg to the other side's Read function.
return len(msg.data), msg.err
case <-p.ctx.Done():
return 0, p.ctx.Err()
}
}
// Close is unusual in that it doesn't actually close the pipe. Instead it sends an io.EOF error
// to the other side's Read function. This is so the other side can detect that a session has ended.
// Each call to Close will indicate to the other side that a session has ended, whilst allowing the
// reuse of a single persistent pipe for multiple sessions.
func (p *pipe) Close() error {
msg := msg{
data: nil,
err: io.EOF, // Indicates to the other side's Read function that session has ended.
}
select {
case p.otherEnd.ch <- msg: // Sent msg to the other side's Read function.
return nil
case <-p.ctx.Done():
return p.ctx.Err()
}
}
// NewPipePair returns a pair of io.ReadWriterCloser pipes that are connected together such that
// writes to one will appear as reads on the other and vice versa. Calling Close() on one end will
// indicate to the other end that the session has ended.
func NewPipePair(ctx context.Context) (io.ReadWriteCloser, io.ReadWriteCloser) {
aEnd := &pipe{
ch: make(chan msg, bufferSize),
ctx: ctx,
}
bEnd := &pipe{
ch: make(chan msg, bufferSize),
ctx: ctx,
}
aEnd.otherEnd = bEnd
bEnd.otherEnd = aEnd
return aEnd, bEnd
}