forked from influxdata/telegraf
/
writer.go
229 lines (193 loc) · 5.88 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package snappystream
import (
"bufio"
"errors"
"fmt"
"hash/crc32"
"io"
"github.com/mreiferson/go-snappystream/snappy-go"
)
var errClosed = fmt.Errorf("closed")
// BufferedWriter is an io.WriteCloser with behavior similar to writers
// returned by NewWriter but it buffers written data, maximizing block size (to
// improve the output compression ratio) at the cost of speed. Benefits over
// NewWriter are most noticible when individual writes are small and when
// streams are long.
//
// Failure to call a BufferedWriter's Close or Flush methods after it is done
// being written to will likely result in missing data frames which will be
// undetectable in the decoding process.
//
// NOTE: BufferedWriter cannot be instantiated via struct literal and must
// use NewBufferedWriter (i.e. its zero value is not usable).
type BufferedWriter struct {
err error
w *writer
bw *bufio.Writer
}
// NewBufferedWriter allocates and returns a BufferedWriter with an internal
// buffer of MaxBlockSize bytes. If an error occurs writing a block to w, all
// future writes will fail with the same error. After all data has been
// written, the client should call the Flush method to guarantee all data has
// been forwarded to the underlying io.Writer.
func NewBufferedWriter(w io.Writer) *BufferedWriter {
_w := NewWriter(w).(*writer)
return &BufferedWriter{
w: _w,
bw: bufio.NewWriterSize(_w, MaxBlockSize),
}
}
// ReadFrom implements the io.ReaderFrom interface used by io.Copy. It encodes
// data read from r as a snappy framed stream that is written to the underlying
// writer. ReadFrom returns the number number of bytes read, along with any
// error encountered (other than io.EOF).
func (w *BufferedWriter) ReadFrom(r io.Reader) (int64, error) {
if w.err != nil {
return 0, w.err
}
var n int64
n, w.err = w.bw.ReadFrom(r)
return n, w.err
}
// Write buffers p internally, encoding and writing a block to the underlying
// buffer if the buffer grows beyond MaxBlockSize bytes. The returned int
// will be 0 if there was an error and len(p) otherwise.
func (w *BufferedWriter) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
_, w.err = w.bw.Write(p)
if w.err != nil {
return 0, w.err
}
return len(p), nil
}
// Flush encodes and writes a block with the contents of w's internal buffer to
// the underlying writer even if the buffer does not contain a full block of
// data (MaxBlockSize bytes).
func (w *BufferedWriter) Flush() error {
if w.err == nil {
w.err = w.bw.Flush()
}
return w.err
}
// Close flushes w's internal buffer and tears down internal data structures.
// After a successful call to Close method calls on w return an error. Close
// makes no attempt to close the underlying writer.
func (w *BufferedWriter) Close() error {
if w.err != nil {
return w.err
}
w.err = w.bw.Flush()
w.w = nil
w.bw = nil
if w.err != nil {
return w.err
}
w.err = errClosed
return nil
}
type writer struct {
writer io.Writer
err error
hdr []byte
dst []byte
sentStreamID bool
}
// NewWriter returns an io.Writer that writes its input to an underlying
// io.Writer encoded as a snappy framed stream. A stream identifier block is
// written to w preceding the first data block. The returned writer will never
// emit a block with length in bytes greater than MaxBlockSize+4 nor one
// containing more than MaxBlockSize bytes of (uncompressed) data.
//
// For each Write, the returned length will only ever be len(p) or 0,
// regardless of the length of *compressed* bytes written to the wrapped
// io.Writer. If the returned length is 0 then error will be non-nil. If
// len(p) exceeds 65536, the slice will be automatically chunked into smaller
// blocks which are all emitted before the call returns.
func NewWriter(w io.Writer) io.Writer {
return &writer{
writer: w,
hdr: make([]byte, 8),
dst: make([]byte, 4096),
}
}
func (w *writer) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
total := 0
sz := MaxBlockSize
var n int
for i := 0; i < len(p); i += n {
if i+sz > len(p) {
sz = len(p) - i
}
n, w.err = w.write(p[i : i+sz])
if w.err != nil {
return 0, w.err
}
total += n
}
return total, nil
}
// write attempts to encode p as a block and write it to the underlying writer.
// The returned int may not equal p's length if compression below
// MaxBlockSize-4 could not be achieved.
func (w *writer) write(p []byte) (int, error) {
var err error
if len(p) > MaxBlockSize {
return 0, errors.New(fmt.Sprintf("block too large %d > %d", len(p), MaxBlockSize))
}
w.dst = w.dst[:cap(w.dst)] // Encode does dumb resize w/o context. reslice avoids alloc.
w.dst, err = snappy.Encode(w.dst, p)
if err != nil {
return 0, err
}
block := w.dst
n := len(p)
compressed := true
// check for data which is better left uncompressed. this is determined if
// the encoded content is longer than the source.
if len(w.dst) >= len(p) {
compressed = false
block = p[:n]
}
if !w.sentStreamID {
_, err := w.writer.Write(streamID)
if err != nil {
return 0, err
}
w.sentStreamID = true
}
// set the block type
if compressed {
writeHeader(w.hdr, blockCompressed, block, p[:n])
} else {
writeHeader(w.hdr, blockUncompressed, block, p[:n])
}
_, err = w.writer.Write(w.hdr)
if err != nil {
return 0, err
}
_, err = w.writer.Write(block)
if err != nil {
return 0, err
}
return n, nil
}
// writeHeader panics if len(hdr) is less than 8.
func writeHeader(hdr []byte, btype byte, enc, dec []byte) {
hdr[0] = btype
// 3 byte little endian length of encoded content
length := uint32(len(enc)) + 4 // +4 for checksum
hdr[1] = byte(length)
hdr[2] = byte(length >> 8)
hdr[3] = byte(length >> 16)
// 4 byte little endian CRC32 checksum of decoded content
checksum := maskChecksum(crc32.Checksum(dec, crcTable))
hdr[4] = byte(checksum)
hdr[5] = byte(checksum >> 8)
hdr[6] = byte(checksum >> 16)
hdr[7] = byte(checksum >> 24)
}