forked from weaveworks/libgitops
/
frame_reader.go
168 lines (148 loc) · 5.29 KB
/
frame_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package serializer
import (
"bytes"
"errors"
"io"
"io/ioutil"
"os"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
)
const (
defaultBufSize = 64 * 1024 // 64 kB
defaultMaxFrameSize = 16 * 1024 * 1024 // 16 MB
)
var (
// FrameOverflowErr is returned from FrameReader.ReadFrame when one frame exceeds the
// maximum size of 16 MB.
FrameOverflowErr = errors.New("frame was larger than maximum allowed size")
)
// ReadCloser in this package is an alias for io.ReadCloser. It helps in Godoc to locate
// helpers in this package which returns writers (i.e. FromFile and FromBytes)
type ReadCloser io.ReadCloser
// FrameReader is a content-type specific reader of a given ReadCloser.
// The FrameReader reads frames from the underlying ReadCloser and returns them for consumption.
// When io.EOF is reached, the stream is closed automatically.
type FrameReader interface {
ContentTyped
io.Closer
// ReadFrame reads frames from the underlying ReadCloser and returns them for consumption.
// When io.EOF is reached, the stream is closed automatically.
ReadFrame() ([]byte, error)
}
// NewFrameReader returns a FrameReader for the given ContentType and data in the
// ReadCloser. The Reader is automatically closed in io.EOF. ReadFrame is called
// once each Decoder.Decode() or Decoder.DecodeInto() call. When Decoder.DecodeAll() is
// called, the FrameReader is read until io.EOF, upon where it is closed.
func NewFrameReader(contentType ContentType, rc ReadCloser) FrameReader {
switch contentType {
case ContentTypeYAML:
return newFrameReader(json.YAMLFramer.NewFrameReader(rc), contentType)
case ContentTypeJSON:
return newFrameReader(json.Framer.NewFrameReader(rc), contentType)
default:
return &errFrameReader{ErrUnsupportedContentType, contentType}
}
}
// NewYAMLFrameReader returns a FrameReader that supports both YAML and JSON. Frames are separated by "---\n"
//
// This call is the same as NewFrameReader(ContentTypeYAML, rc)
func NewYAMLFrameReader(rc ReadCloser) FrameReader {
return NewFrameReader(ContentTypeYAML, rc)
}
// NewJSONFrameReader returns a FrameReader that supports both JSON. Objects are read from the stream one-by-one,
// each object making up its own frame.
//
// This call is the same as NewFrameReader(ContentTypeJSON, rc)
func NewJSONFrameReader(rc ReadCloser) FrameReader {
return NewFrameReader(ContentTypeJSON, rc)
}
// newFrameReader returns a new instance of the frameReader struct
func newFrameReader(rc io.ReadCloser, contentType ContentType) *frameReader {
return &frameReader{
rc: rc,
bufSize: defaultBufSize,
maxFrameSize: defaultMaxFrameSize,
contentType: contentType,
}
}
// frameReader is a FrameReader implementation
type frameReader struct {
rc io.ReadCloser
bufSize int
maxFrameSize int
contentType ContentType
// TODO: Maybe add mutexes for thread-safety (so no two goroutines read at the same time)
}
// ReadFrame reads one frame from the underlying io.Reader. ReadFrame
// keeps on reading from the Reader in bufSize blocks, until the Reader either
// returns err == nil or EOF. If the Reader reports an ErrShortBuffer error,
// ReadFrame keeps on reading using new calls. ReadFrame might return both data and
// io.EOF. io.EOF will be returned in the final call.
func (rf *frameReader) ReadFrame() (frame []byte, err error) {
// Temporary buffer to parts of a frame into
var buf []byte
// How many bytes were read by the read call
var n int
// Multiplier for bufsize
c := 1
for {
// Allocate a buffer of a multiple of bufSize.
buf = make([]byte, c*rf.bufSize)
// Call the underlying reader.
n, err = rf.rc.Read(buf)
// Append the returned bytes to the b slice returned
// If n is 0, this call is a no-op
frame = append(frame, buf[:n]...)
// If the frame got bigger than the max allowed size, return and report the error
if len(frame) > rf.maxFrameSize {
err = FrameOverflowErr
return
}
// Handle different kinds of errors
switch err {
case io.ErrShortBuffer:
// ignore the "buffer too short" error, and just keep on reading, now doubling the buffer
c *= 2
continue
case nil:
// One document is "done reading", we should return it if valid
// Only return non-empty documents, i.e. skip e.g. leading `---`
if len(bytes.TrimSpace(frame)) > 0 {
// valid non-empty document
return
}
// The document was empty, reset the frame (just to be sure) and continue
frame = nil
continue
case io.EOF:
// we reached the end of the file, close the reader and return
rf.rc.Close()
return
default:
// unknown error, return it immediately
// TODO: Maybe return the error here?
return
}
}
}
// ContentType returns the content type for the given FrameReader
func (rf *frameReader) ContentType() ContentType {
return rf.contentType
}
// Close implements io.Closer and closes the underlying ReadCloser
func (rf *frameReader) Close() error {
return rf.rc.Close()
}
// FromFile returns a ReadCloser from the given file, or a ReadCloser which returns
// the given file open error when read.
func FromFile(filePath string) ReadCloser {
f, err := os.Open(filePath)
if err != nil {
return &errReadCloser{err}
}
return f
}
// FromBytes returns a ReadCloser from the given byte content.
func FromBytes(content []byte) ReadCloser {
return ioutil.NopCloser(bytes.NewReader(content))
}