-
Notifications
You must be signed in to change notification settings - Fork 211
/
chan.go
149 lines (128 loc) · 3.02 KB
/
chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package delimited
import (
"errors"
"fmt"
"github.com/spacemeshos/go-spacemesh/log"
"io"
"sync"
)
// Chan is a delimited duplex channel. It is used to have a channel interface
// around a delimited.Reader or Writer.
type Chan struct {
connection io.ReadWriteCloser
closeOnce sync.Once
cmtx sync.RWMutex
closed bool
outMsgChan chan outMessage
inMsgChan chan []byte
CloseChan chan struct{}
}
// Satisfy formatter.
// In exposes the incoming message channel
func (s *Chan) In() chan []byte {
return s.inMsgChan
}
// Out sends message on the wire, blocking.
func (s *Chan) Out(message []byte) error {
s.cmtx.RLock()
if !s.closed {
outCb := make(chan error)
s.outMsgChan <- outMessage{message, outCb}
s.cmtx.RUnlock()
return <-outCb
}
s.cmtx.RUnlock()
return fmt.Errorf("formatter is closed")
}
type outMessage struct {
m []byte
r chan error
}
func (om outMessage) Message() []byte {
return om.m
}
func (om outMessage) Result() chan error {
return om.r
}
// NewChan constructs a Chan with a given buffer size.
func NewChan(chanSize int) *Chan {
return &Chan{
outMsgChan: make(chan outMessage, chanSize),
inMsgChan: make(chan []byte, chanSize),
CloseChan: make(chan struct{}),
}
}
// Pipe invokes the reader and writer flows, once it's ran Chan can start serving incoming/outgoing messages
func (s *Chan) Pipe(rwc io.ReadWriteCloser) {
s.connection = rwc
go s.readFromReader(rwc)
go s.writeToWriter(rwc)
}
// ReadFrom wraps the given io.Reader with a delimited.Reader, reads all
// messages, ands sends them down the channel.
func (s *Chan) readFromReader(r io.Reader) {
mr := NewReader(r)
// single reader, no need for Mutex
Loop:
for {
buf, err := mr.Next()
if err != nil {
log.Debug("conn: Read chan closed err: %v", err)
break Loop
}
select {
case <-s.CloseChan:
break Loop // told we're done
default:
if buf != nil {
newbuf := make([]byte, len(buf))
copy(newbuf, buf)
// ok seems fine. send it away
s.inMsgChan <- newbuf
}
}
}
s.Close() // close writer
close(s.inMsgChan)
}
// WriteToWriter wraps the given io.Writer with a delimited.Writer, listens on the
// channel and writes all messages to the writer.
func (s *Chan) writeToWriter(w io.Writer) {
// new buffer per message
// if bottleneck, cycle around a set of buffers
mw := NewWriter(w)
// single writer, no need for Mutex
Loop:
for {
s.cmtx.RLock()
cl := s.closed
s.cmtx.RUnlock()
if cl {
break Loop
}
msg := <-s.outMsgChan
if _, err := mw.WriteRecord(msg.Message()); err != nil {
// unexpected error. tell the client.
msg.Result() <- err
break Loop
} else {
// Report msg was sent
msg.Result() <- nil
}
}
cou := len(s.outMsgChan)
for i := 0; i < cou; i++ {
msg := <-s.outMsgChan
msg.Result() <- errors.New("formatter is closed")
}
}
// Close the Chan
func (s *Chan) Close() {
s.closeOnce.Do(func() {
s.cmtx.Lock()
s.closed = true
s.cmtx.Unlock()
close(s.CloseChan) // close both writer and reader
s.connection.Close() // close internal connection
})
}