This repository has been archived by the owner on Sep 11, 2020. It is now read-only.
/
writer.go
135 lines (111 loc) · 2.53 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package akaibu
import (
"bufio"
"code.google.com/p/snappy-go/snappy"
"compress/zlib"
"io"
)
// Writer.
type Writer interface {
io.Closer
// Write a record.
Write(p []byte) error
}
// Writer implementation.
type writer struct {
w io.WriteCloser
}
func (w *writer) Close() (err error) {
if w.w != nil {
err = w.w.Close()
w.w = nil
}
return
}
func (w *writer) Write(p []byte) (err error) {
return writeRecord(w.w, p)
}
// Snappy-specific writer implementation.
type snappyWriter struct {
bw *bufio.Writer
sw *snappy.Writer
sbw *bufio.Writer
}
func (w *snappyWriter) Close() (err error) {
if w.sbw != nil {
err = w.sbw.Flush()
err = w.bw.Flush()
w.sbw = nil
w.bw = nil
w.sw = nil
}
return
}
func (w *snappyWriter) Write(p []byte) (err error) {
return writeRecord(w.sbw, p)
}
// Write a header.
func writeHeader(w io.Writer, c Compression) error {
return writeFull(w, []byte{'A', 'K', 'A', 'I', 1, byte(c), 0, 0})
}
// Write a record.
func writeRecord(w io.Writer, p []byte) (err error) {
var sizeData []byte
if sizeData, err = PackInt(uint64(len(p))); err != nil {
return
}
if err = writeFull(w, sizeData); err != nil {
return
}
if len(p) == 0 {
return nil
}
return writeFull(w, p)
}
// New uncompressed writer.
func NewUncompressedWriter(w io.Writer) (Writer, error) {
var bw *bufio.Writer
var ok bool
if bw, ok = w.(*bufio.Writer); !ok {
bw = bufio.NewWriter(w)
}
// Attempt to write the header.
if err := writeHeader(bw, UncompressedCompression); err != nil {
return nil, err
}
return &writer{newFlushCloser(bw)}, nil
}
// New zlib-compressed writer.
//
// The level has the same function as in compress/zlib.
func NewZlibCompressedWriter(w io.Writer, level int) (Writer, error) {
// Attempt to write the header.
if err := writeHeader(w, ZlibCompression); err != nil {
return nil, err
}
// Set up the compression writer.
wc, err := zlib.NewWriterLevel(w, level)
if err != nil {
return nil, err
}
return &writer{wc}, nil
}
// New Snappy-compressed writer.
func NewSnappyCompressedWriter(w io.Writer) (Writer, error) {
var bw *bufio.Writer
var ok bool
if bw, ok = w.(*bufio.Writer); !ok {
bw = bufio.NewWriter(w)
}
// Attempt to write the header.
if err := writeHeader(w, SnappyCompression); err != nil {
return nil, err
}
// Set up the compression writer and write an empty slice to force the
// header to be written.
sw := snappy.NewWriter(bw)
if _, err := sw.Write([]byte{}); err != nil {
return nil, err
}
return &snappyWriter{bw, sw, bufio.NewWriterSize(sw, 1<<16)}, nil
}