forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch_string.go
144 lines (123 loc) · 3.66 KB
/
batch_string.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package tsm1
import (
"encoding/binary"
"errors"
"fmt"
"math"
"unsafe"
"github.com/golang/snappy"
)
var (
errStringBatchDecodeInvalidStringLength = fmt.Errorf("StringArrayDecodeAll: invalid encoded string length")
errStringBatchDecodeLengthOverflow = fmt.Errorf("StringArrayDecodeAll: length overflow")
errStringBatchDecodeShortBuffer = fmt.Errorf("StringArrayDecodeAll: short buffer")
// ErrStringArrayEncodeTooLarge reports that the encoded length of a slice of strings is too large.
ErrStringArrayEncodeTooLarge = errors.New("StringArrayEncodeAll: source length too large")
)
// StringArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// Currently only the string compression scheme used snappy.
func StringArrayEncodeAll(src []string, b []byte) ([]byte, error) {
srcSz64 := int64(2 + len(src)*binary.MaxVarintLen32) // strings should't be longer than 64kb
for i := range src {
srcSz64 += int64(len(src[i]))
}
// 32-bit systems
if srcSz64 > math.MaxUint32 {
return b[:0], ErrStringArrayEncodeTooLarge
}
srcSz := int(srcSz64)
// determine the maximum possible length needed for the buffer, which
// includes the compressed size
var compressedSz = 0
if len(src) > 0 {
mle := snappy.MaxEncodedLen(srcSz)
if mle == -1 {
return b[:0], ErrStringArrayEncodeTooLarge
}
compressedSz = mle + 1 /* header */
}
totSz := srcSz + compressedSz
if cap(b) < totSz {
b = make([]byte, totSz)
} else {
b = b[:totSz]
}
// Shortcut to snappy encoding nothing.
if len(src) == 0 {
b[0] = stringCompressedSnappy << 4
return b[:2], nil
}
// write the data to be compressed *after* the space needed for snappy
// compression. The compressed data is at the start of the allocated buffer,
// ensuring the entire capacity is returned and available for subsequent use.
dta := b[compressedSz:]
n := 0
for i := range src {
n += binary.PutUvarint(dta[n:], uint64(len(src[i])))
n += copy(dta[n:], src[i])
}
dta = dta[:n]
dst := b[:compressedSz]
dst[0] = stringCompressedSnappy << 4
res := snappy.Encode(dst[1:], dta)
return dst[:len(res)+1], nil
}
func StringArrayDecodeAll(b []byte, dst []string) ([]string, error) {
// First byte stores the encoding type, only have snappy format
// currently so ignore for now.
if len(b) > 0 {
var err error
// it is important that to note that `snappy.Decode` always returns
// a newly allocated slice as the final strings reference this slice
// directly.
b, err = snappy.Decode(nil, b[1:])
if err != nil {
return []string{}, fmt.Errorf("failed to decode string block: %v", err.Error())
}
} else {
return []string{}, nil
}
var (
i, l int
)
sz := cap(dst)
if sz == 0 {
sz = 64
dst = make([]string, sz)
} else {
dst = dst[:sz]
}
j := 0
for i < len(b) {
length, n := binary.Uvarint(b[i:])
if n <= 0 {
return []string{}, errStringBatchDecodeInvalidStringLength
}
// The length of this string plus the length of the variable byte encoded length
l = int(length) + n
lower := i + n
upper := lower + int(length)
if upper < lower {
return []string{}, errStringBatchDecodeLengthOverflow
}
if upper > len(b) {
return []string{}, errStringBatchDecodeShortBuffer
}
// NOTE: this optimization is critical for performance and to reduce
// allocations. This is just as "safe" as string.Builder, which
// returns a string mapped to the original byte slice
s := b[lower:upper]
val := *(*string)(unsafe.Pointer(&s))
if j < len(dst) {
dst[j] = val
} else {
dst = append(dst, val) // force a resize
dst = dst[:cap(dst)]
}
i += l
j++
}
return dst[:j], nil
}