/
tee.go
241 lines (198 loc) · 4.79 KB
/
tee.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information
package sync2
import (
"errors"
"io"
"sync"
"sync/atomic"
"github.com/calebcase/tmpfile"
)
// PipeWriter allows closing the writer with an error.
type PipeWriter interface {
io.WriteCloser
CloseWithError(reason error) error
}
// PipeReader allows closing the reader with an error.
type PipeReader interface {
io.ReadCloser
CloseWithError(reason error) error
}
// NewTeeFile returns a tee that uses file-system to offload memory.
func NewTeeFile(readers int, tempdir string) ([]PipeReader, PipeWriter, error) {
file, err := tmpfile.New(tempdir, "tee")
if err != nil {
return nil, nil, err
}
handles := int64(readers + 1) // +1 for the writer
tee := &tee{
open: &handles,
}
tee.nodata.L = &tee.mu
tee.noreader.L = &tee.mu
teeReaders := make([]PipeReader, readers)
for i := 0; i < readers; i++ {
teeReaders[i] = &teeReader{
tee: tee,
buffer: &sharedFile{
file: file,
open: &handles,
},
}
}
return teeReaders, &teeWriter{
tee: tee,
buffer: &sharedFile{
file: file,
open: &handles,
},
}, nil
}
// NewTeeInmemory returns a tee that uses inmemory.
func NewTeeInmemory(readers int, blockSize int64) ([]PipeReader, PipeWriter, error) {
block := &memoryBlock{
data: make([]byte, blockSize),
}
handles := int64(readers + 1) // +1 for the writer
tee := &tee{
open: &handles,
}
tee.nodata.L = &tee.mu
tee.noreader.L = &tee.mu
teeReaders := make([]PipeReader, readers)
for i := 0; i < readers; i++ {
teeReaders[i] = &teeReader{
tee: tee,
buffer: &blockReader{
current: block,
},
}
}
return teeReaders, &teeWriter{
tee: tee,
buffer: &blockWriter{
current: block,
},
}, nil
}
// tee synchronizes access to a shared buffer with one writer and multiple readers.
type tee struct {
noCopy noCopy //nolint:structcheck
open *int64
mu sync.Mutex
nodata sync.Cond
noreader sync.Cond
maxRead int64
write int64
writerDone bool
writerErr error
}
type teeReader struct {
tee *tee
buffer io.ReadCloser
pos int64
closed int32
}
type teeWriter struct {
tee *tee
buffer io.WriteCloser
}
// Read reads from the tee returning io.EOF when writer is closed or bufSize is reached.
//
// It will block if the writer has not provided the data yet.
func (reader *teeReader) Read(data []byte) (n int, err error) {
tee := reader.tee
tee.mu.Lock()
// fail fast on writer error
if tee.writerErr != nil && !errors.Is(tee.writerErr, io.EOF) {
tee.mu.Unlock()
return 0, tee.writerErr
}
toRead := int64(len(data))
end := reader.pos + toRead
if end > tee.maxRead {
tee.maxRead = end
tee.noreader.Broadcast()
}
// wait until we have any data to read
for reader.pos >= tee.write {
// has the writer finished?
if tee.writerDone {
tee.mu.Unlock()
return 0, tee.writerErr
}
// ok, let's wait
tee.nodata.Wait()
}
// how much there's available for reading
canRead := tee.write - reader.pos
if toRead > canRead {
toRead = canRead
}
tee.mu.Unlock()
// read data
readAmount, err := reader.buffer.Read(data[:toRead])
reader.pos += int64(readAmount)
return readAmount, err
}
// Write writes to the buffer returning io.ErrClosedPipe when limit is reached.
//
// It will block until at least one reader require the data.
func (writer *teeWriter) Write(data []byte) (n int, err error) {
tee := writer.tee
tee.mu.Lock()
// have we closed already
if tee.writerDone {
tee.mu.Unlock()
return 0, io.ErrClosedPipe
}
for tee.write > tee.maxRead {
// are all readers already closed?
if atomic.LoadInt64(tee.open) <= 1 {
tee.mu.Unlock()
return 0, io.ErrClosedPipe
}
// wait until new data is required by any reader
tee.noreader.Wait()
}
tee.mu.Unlock()
// write data to buffer
writeAmount, err := writer.buffer.Write(data)
tee.mu.Lock()
// update writing head
tee.write += int64(writeAmount)
// wake up reader
tee.nodata.Broadcast()
tee.mu.Unlock()
return writeAmount, err
}
// Close implements io.Reader Close.
func (reader *teeReader) Close() error { return reader.CloseWithError(nil) }
// Close implements io.Writer Close.
func (writer *teeWriter) Close() error { return writer.CloseWithError(nil) }
// CloseWithError implements closing with error.
func (reader *teeReader) CloseWithError(reason error) (err error) {
tee := reader.tee
if atomic.CompareAndSwapInt32(&reader.closed, 0, 1) {
err = reader.buffer.Close()
}
tee.noreader.Broadcast()
return err
}
// CloseWithError implements closing with error.
func (writer *teeWriter) CloseWithError(reason error) error {
if reason == nil {
reason = io.EOF
}
tee := writer.tee
tee.mu.Lock()
if tee.writerDone {
tee.mu.Unlock()
return io.ErrClosedPipe
}
tee.writerDone = true
tee.writerErr = reason
tee.nodata.Broadcast()
tee.mu.Unlock()
return writer.buffer.Close()
}