This repository has been archived by the owner on Feb 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
stream.go
220 lines (185 loc) · 4.86 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Package stream is a library of generic types designed to work on streams of
// values.
package stream
import (
"errors"
"io"
)
// Reader is an interface implemented by types that read a stream of values of
// type T.
type Reader[T any] interface {
// Reads values from the stream, returning the number of values read and any
// error that occurred.
//
// The error is io.EOF when the end of the stream has been reached.
Read(values []T) (int, error)
}
// NewReader constructs a Reader from a sequence of values.
func NewReader[T any](values ...T) Reader[T] {
return &reader[T]{values: append([]T{}, values...)}
}
type reader[T any] struct{ values []T }
func (r *reader[T]) Read(values []T) (n int, err error) {
n = copy(values, r.values)
r.values = r.values[n:]
if len(r.values) == 0 {
err = io.EOF
}
return n, err
}
// ReadCloser represents a closable stream of values of T.
//
// ReadClosers is like io.ReadCloser for values of any type.
type ReadCloser[T any] interface {
Reader[T]
io.Closer
}
// NewReadCloser constructs a ReadCloser from the pair of r and c.
func NewReadCloser[T any](r Reader[T], c io.Closer) ReadCloser[T] {
return &readCloser[T]{reader: r, closer: c}
}
type readCloser[T any] struct {
reader Reader[T]
closer io.Closer
}
func (r *readCloser[T]) Close() error { return r.closer.Close() }
func (r *readCloser[T]) Read(values []T) (int, error) { return r.reader.Read(values) }
func ErrCloser[T any](err error) ReadCloser[T] {
return errCloser[T]{err}
}
type errCloser[T any] struct{ err error }
func (r errCloser[T]) Close() error { return nil }
func (r errCloser[T]) Read(values []T) (int, error) { return 0, r.err }
// NopCloser constructs a ReadCloser from a Reader.
func NopCloser[T any](r Reader[T]) ReadCloser[T] {
return &nopCloser[T]{reader: r}
}
type nopCloser[T any] struct{ reader Reader[T] }
func (r *nopCloser[T]) Close() error { return nil }
func (r *nopCloser[T]) Read(values []T) (int, error) { return r.reader.Read(values) }
type ReadSeeker[T any] interface {
Reader[T]
io.Seeker
}
type ReadSeekCloser[T any] interface {
Reader[T]
io.Seeker
io.Closer
}
var (
errSeekWhence = errors.New("seek: invalid whence value")
errSeekOffset = errors.New("seek: offset out of range")
)
func Seek(offset, length, seek int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
offset = seek
case io.SeekCurrent:
offset += seek
case io.SeekEnd:
offset = length - seek
default:
return -1, errSeekWhence
}
if offset < 0 {
return -1, errSeekOffset
}
if offset > length {
offset = length
}
return offset, nil
}
// ReadAll reads all values from r and returns them as a slice, along with any
// error that occurred (other than io.EOF).
func ReadAll[T any](r Reader[T]) ([]T, error) {
values := make([]T, 0, 1)
for {
if len(values) == cap(values) {
values = append(values, make([]T, 2*len(values))...)[:len(values)]
}
n, err := r.Read(values[len(values):cap(values)])
values = values[:len(values)+n]
if err != nil {
if err == io.EOF {
err = nil
}
return values, err
}
}
}
func ReaderFunc[T any](f func([]T) (int, error)) Reader[T] {
return readerFunc[T](f)
}
type readerFunc[T any] func([]T) (int, error)
func (f readerFunc[T]) Read(values []T) (int, error) {
return f(values)
}
// Writer is an interface implemented by types that write a stream of values of
// type T.
type Writer[T any] interface {
Write(values []T) (int, error)
}
// WriteCloser represents a closable stream of values of T.
//
// WriteClosers is like io.WriteCloser for values of any type.
type WriteCloser[T any] interface {
Writer[T]
io.Closer
}
func NewWriteCloser[T any](w Writer[T], c io.Closer) WriteCloser[T] {
return &writeCloser[T]{writer: w, closer: c}
}
type writeCloser[T any] struct {
writer Writer[T]
closer io.Closer
}
func (w *writeCloser[T]) Write(values []T) (int, error) {
return w.writer.Write(values)
}
func (w *writeCloser[T]) Close() error {
return w.closer.Close()
}
// Copy writes values read from r to w, returning the number of values written
// and any error other than io.EOF.
func Copy[T any](w Writer[T], r Reader[T]) (int64, error) {
b := make([]T, 20)
n := int64(0)
for {
rn, err := r.Read(b)
if rn > 0 {
wn, err := w.Write(b[:rn])
n += int64(wn)
if err != nil {
return n, err
}
if wn < rn {
return n, io.ErrNoProgress
}
}
if err != nil {
if err == io.EOF {
err = nil
}
return n, err
}
}
}
func ReadFull[T any](r Reader[T], buf []T) (int, error) {
return ReadAtLeast[T](r, buf, len(buf))
}
func ReadAtLeast[T any](r Reader[T], buf []T, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var rn int
rn, err = r.Read(buf[n:])
n += rn
}
if n >= min {
err = nil
} else if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}