-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
stream_reader.go
75 lines (62 loc) · 1.93 KB
/
stream_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
Copyright 2017, 2019 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"bytes"
"io"
)
// ReceiveFunc is a function that either returns a slice
// of an arbitrary number of bytes OR an error. Returning
// an io.EOF means there is no more data to be read; any
// other error is considered an actual error.
type ReceiveFunc func() ([]byte, error)
// CloseFunc is used to signal to the source of data that
// the StreamReadCloser has been closed.
type CloseFunc func() error
// StreamReadCloser wraps a ReceiveFunc and a CloseSendFunc
// to implement io.ReadCloser.
type StreamReadCloser struct {
buf *bytes.Buffer
receive ReceiveFunc
close CloseFunc
}
func (s *StreamReadCloser) Read(p []byte) (n int, err error) {
for {
// if buf exists and holds at least as much as we're trying to read,
// read from the buffer
if s.buf != nil && s.buf.Len() >= len(p) {
return s.buf.Read(p)
}
// if buf is nil, create it
if s.buf == nil {
s.buf = new(bytes.Buffer)
}
// buf exists but doesn't hold enough data to fill p, so
// receive again. If we get an EOF, return what's in the
// buffer; else, write the new data to the buffer and
// try another read.
data, err := s.receive()
if err == io.EOF {
return s.buf.Read(p)
}
if err != nil {
return 0, err
}
if _, err := s.buf.Write(data); err != nil {
return 0, err
}
}
}
func (s *StreamReadCloser) Close() error {
return s.close()
}