-
Notifications
You must be signed in to change notification settings - Fork 0
/
block.go
241 lines (210 loc) · 5.57 KB
/
block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package lsm
import (
"bytes"
"encoding/binary"
"errors"
"io"
"sort"
)
var byteOrder = binary.LittleEndian
var errCorruptBlock = errors.New("corrupt block")
// readBlock takes the raw bytes of a block, reads and parses the footer
// and returns a block with the footer indexes.
func readBlock(raw []byte) (block, error) {
if len(raw) < 4 {
return block{}, errCorruptBlock
}
// the trailing 4 bytes are a uint32 specifying the offset
// of where the footer begins.
footerOff := byteOrder.Uint32(raw[len(raw)-4:])
if int(footerOff) >= len(raw) {
return block{}, errCorruptBlock
}
footer := bytes.NewReader(raw[footerOff : len(raw)-4])
restarts := make([]uint32, 0)
for footer.Len() > 0 {
off, err := binary.ReadUvarint(footer)
if err != nil {
return block{}, err
}
if int(off) >= len(raw) {
return block{}, errCorruptBlock
}
restarts = append(restarts, uint32(off))
}
return block{
data: raw[:footerOff],
restarts: restarts,
}, nil
}
type block struct {
data []byte // raw block, without footer
restarts []uint32 // parsed footer
}
// iter iterates over the block calling fn on each key-value pair.
// The key bytes are owned by iter and are only valid until fn
// returns. The value is a slice of the block's bytes. Keeping
// a reference to it will prevent GC-ing of the block.
func (b block) iter(fn func(key, val []byte)) error {
it := blockIterator{block: b}
for it.hasNext() {
err := it.next()
if err != nil {
return err
}
fn(it.key, it.value)
}
return nil
}
// iterAt returns a block iterator starting at the provided
// key, or where the provided key would be if it were contained
// in the block. It binary searches among the block's restart
// points and linear scans from there.
func (b block) iterAt(key []byte) (blockIterator, error) {
bi := blockIterator{block: b}
var decodeErr error
i := sort.Search(len(b.restarts), func(r int) bool {
bi.off = int(b.restarts[r])
err := bi.next()
if err != nil {
decodeErr = err
return false
}
return bytes.Compare(bi.key, key) >= 0
})
if decodeErr != nil {
return bi, decodeErr
}
// i is now the index of the smallest restart point >= key.
// We now linear scan from the i-1 restart point.
if i == 0 {
// Scan from the beginning of the block, not the first
// restart point which is already several keys into the
// block.
bi.off = 0
} else {
bi.off = int(b.restarts[i-1])
}
for bi.hasNext() {
err := bi.next()
if err != nil {
return bi, err
}
if bytes.Compare(bi.key, key) >= 0 {
break
}
}
return bi, nil
}
type blockIterator struct {
block block
off int
entryOff int
key, value []byte
}
func (bi blockIterator) hasNext() bool {
return bi.off < len(bi.block.data)
}
func (bi *blockIterator) ReadByte() (byte, error) {
if bi.off >= len(bi.block.data) {
return 0, io.ErrUnexpectedEOF
}
b := bi.block.data[bi.off]
bi.off++
return b, nil
}
func (bi *blockIterator) next() error {
startOff := bi.off
shared, err := binary.ReadUvarint(bi)
if err != nil {
return err
}
nonshared, err := binary.ReadUvarint(bi)
if err != nil {
return err
}
valueLen, err := binary.ReadUvarint(bi)
if err != nil {
return err
}
rest := bi.block.data[bi.off:]
// set bi.key, bi.value for the next value
keyDelta := rest[:nonshared]
bi.key = bi.key[:shared]
bi.key = append(bi.key, keyDelta...)
bi.value = rest[nonshared : nonshared+valueLen]
bi.off += int(nonshared + valueLen)
bi.entryOff = startOff
return nil
}
// blockBuilder generates blocks with prefix-compressed keys.
//
// Every restartInterval keys blockBuilder will restart the
// prefix compression. It saves the restart offset to restarts.
// Upon finishing the block, the restarts are added to the end.
// A read may binary search through the restart points to find
// where to begin searching.
type blockBuilder struct {
buf bytes.Buffer
lastKey []byte
counter int
restartInterval int
restarts []int
tmp [binary.MaxVarintLen64]byte // varint scratch space
}
// Reset resets the builder to be empty,
// but it retains the underlying storage for use by future writes.
func (bb *blockBuilder) reset() {
bb.buf.Reset()
bb.lastKey = nil
bb.counter = 0
bb.restarts = nil
}
// size estimates the size of the finished block
func (bb *blockBuilder) size() int {
return bb.buf.Len() + 4*len(bb.restarts)
}
func (bb *blockBuilder) finish() []byte {
// Write the restart offsets too as a footer. All integers
// are written as uvarints except for the final offset, which
// is written as a uint32.
// [r_0 | uvarint] [r_1 | uvarint] ... [r_n | uvarint] [offset of r_0 | uint32]
restartsOff := bb.buf.Len()
for _, off := range bb.restarts {
bb.putUvarint(off)
}
var tmp [4]byte
byteOrder.PutUint32(tmp[:], uint32(restartsOff))
bb.buf.Write(tmp[:])
return bb.buf.Bytes()
}
func (bb *blockBuilder) add(k, v []byte) {
shared := 0
if bb.counter >= bb.restartInterval {
bb.restarts = append(bb.restarts, bb.buf.Len())
bb.counter = 0
} else {
// Count how many characters are shared between k and lastKey
minLen := len(bb.lastKey)
if len(k) < minLen {
minLen = len(k)
}
for shared < minLen && bb.lastKey[shared] == k[shared] {
shared++
}
}
nonshared := len(k) - shared
// Write the lengths: shared key, unshared key, value
bb.putUvarint(shared)
bb.putUvarint(nonshared)
bb.putUvarint(len(v))
// Write the unshared key bytes and the value
bb.buf.Write(k[shared:])
bb.buf.Write(v)
bb.lastKey = k
bb.counter++
}
func (bb *blockBuilder) putUvarint(v int) {
b := binary.PutUvarint(bb.tmp[:], uint64(v))
bb.buf.Write(bb.tmp[:b])
}