-
Notifications
You must be signed in to change notification settings - Fork 53
/
compression.go
84 lines (73 loc) · 1.45 KB
/
compression.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package patch
import (
"io"
"sync"
"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/encoding"
)
type compressor struct {
encoderPool sync.Pool
decoderPool sync.Pool
}
func (c *compressor) Name() string {
return "zstd"
}
func init() {
c := &compressor{}
c.encoderPool.New = func() any {
w, _ := zstd.NewWriter(nil,
zstd.WithEncoderLevel(zstd.SpeedBetterCompression),
zstd.WithNoEntropyCompression(true),
)
return &writer{
Encoder: w,
pool: &c.encoderPool,
}
}
encoding.RegisterCompressor(c)
}
type writer struct {
*zstd.Encoder
pool *sync.Pool
}
func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.encoderPool.Get().(*writer)
z.Encoder.Reset(w)
return z, nil
}
func (z *writer) Close() error {
defer z.pool.Put(z)
return z.Encoder.Close()
}
type reader struct {
*zstd.Decoder
pool *sync.Pool
}
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.decoderPool.Get().(*reader)
if !inPool {
newZ, err := zstd.NewReader(r,
zstd.WithDecoderLowmem(true),
zstd.WithDecoderMaxMemory(32*1024*1024), // 32MB
)
if err != nil {
return nil, err
}
return &reader{
Decoder: newZ,
pool: &c.decoderPool,
}, nil
}
if err := z.Reset(r); err != nil {
c.decoderPool.Put(z)
return nil, err
}
return z, nil
}
func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Decoder.Read(p)
if err == io.EOF {
z.pool.Put(z)
}
return n, err
}