-
Notifications
You must be signed in to change notification settings - Fork 0
/
sstable.go
377 lines (361 loc) · 8.35 KB
/
sstable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
package lsmtree
import (
"bytes"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
)
type ssTableIndex struct {
first []byte
last []byte
count int
data []*Index
}
func newSSTableIndex(index []*Index) *ssTableIndex {
if index == nil || len(index) < 1 {
return &ssTableIndex{
first: nil,
last: nil,
count: 0,
data: make([]*Index, 0),
}
}
return &ssTableIndex{
first: index[0].Key,
last: index[len(index)-1].Key,
count: len(index),
data: index,
}
}
func (ssti *ssTableIndex) Len() int {
return len(ssti.data)
}
func (ssti *ssTableIndex) close() {
ssti.first = nil
ssti.last = nil
ssti.count = 0
for i := range ssti.data {
ssti.data[i] = nil
}
ssti.data = nil
}
type ssTable struct {
path string
fd *os.File
index *ssTableIndex
}
func createSSTable(dir string, memt *rbTree) error {
// create level-0 path for newly flushed ss-tables
path := filepath.Join(dir, levelToDir(0))
// read the base dir for this level
files, err := os.ReadDir(path)
if err != nil {
return err
}
// init seq
var seq int64
// count the files to get the sequence number
for _, file := range files {
// if the file is a sst-table data file, increment
if !file.IsDir() && strings.HasSuffix(file.Name(), dataFileSuffix) {
seq++
}
}
// create a new data file
dataFile, err := openDataFile(path, seq, os.O_CREATE|os.O_WRONLY)
// get data file name
//dataFileName := filepath.Join(path, toDataFileName(seq))
// open data file
//dataFile, err := os.OpenFile(dataFileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
// remember to close
defer func(dataFile *os.File) {
err := dataFile.Close()
if err != nil {
panic("closing dataFile: " + err.Error())
}
}(dataFile)
// create a new index file
indexFile, err := openIndexFile(path, seq, os.O_CREATE|os.O_WRONLY)
// get index file name
//indexFileName := filepath.Join(path, toIndexFileName(seq))
// open index file
//indexFile, err := os.OpenFile(indexFileName, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
// remember to close
defer func(indexFile *os.File) {
err := indexFile.Close()
if err != nil {
panic("closing indexFile: " + err.Error())
}
}(indexFile)
// range mem-table and write entries and indexes
memt.rangeFront(func(e *Entry) bool {
// write entry to data file
offset, err := writeEntry(dataFile, e)
if err != nil {
// for now, just panic
panic(err)
}
// write index to index file
_, err = writeIndex(indexFile, &Index{
Key: e.Key,
Offset: offset,
})
if err != nil {
// for now, just panic
panic(err)
}
return true
})
// sync data file
err = dataFile.Sync()
if err != nil {
return err
}
// sync index file
err = indexFile.Sync()
if err != nil {
return err
}
return nil
}
func openSSTable(path string, seq int64) (*ssTable, error) {
// open index file
indexFile, err := openIndexFile(path, seq, os.O_RDONLY)
if err != nil {
return nil, err
}
// create an index set
var index []*Index
// load up the ss-table-index entries
for {
// read index entry from the index file
i, err := readIndex(indexFile)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
// make sure we close!
err = indexFile.Close()
if err != nil {
return nil, err
}
return nil, err
}
// add index to the index set
index = append(index, i)
}
// close index file
err = indexFile.Close()
if err != nil {
return nil, err
}
// make ss-table instance to return
sst := &ssTable{
path: toDataFileName(seq),
fd: nil,
index: newSSTableIndex(index),
}
// return ss-table instance
return sst, nil
}
func (sst *ssTable) keyInRange(key []byte) bool {
// error check
if key == nil {
return false
}
// return boolean reporting key being between the lo and hi values
return isBetween(sst.index.first, key, sst.index.last)
}
func isBetween(lo, key, hi []byte) bool {
return bytes.Compare(lo, key) <= 0 && bytes.Compare(hi, key) >= 0
}
func locateSSTable(base string, key []byte) (string, error) {
// initialize vars for return
var sstPath string
// start walking the directory tree from the supplied base
err := filepath.WalkDir(base, func(path string, de fs.DirEntry, err error) error {
// handle path error
if err != nil {
fmt.Fprintf(os.Stderr, "prevent panic by handling failure accessing a path %q: %v\n", path, err)
return err
}
// we found a ss-table index file
if !de.IsDir() && strings.HasPrefix(de.Name(), dataFileSuffix) {
// open index file
dataFile, err := os.OpenFile(path, os.O_RDONLY, 0666)
if err != nil {
return err
}
// read through the index file entries
for {
// read index entry from the index file
e, err := readEntry(dataFile)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
// make sure we close!
err = dataFile.Close()
if err != nil {
return err
}
return err
}
// see if we have a match
if bytes.Contains(e.Key, key) {
sstPath = path
break
}
}
// close index file
err = dataFile.Close()
if err != nil {
return err
}
}
return nil
})
if err != nil {
return "", err
}
// got one?
return sstPath, nil
}
/*
func searchInSSTablesOLD(base string, key []byte) (*Entry, error) {
// read the base dir for this level
dirs, err := os.ReadDir(base)
if err != nil {
return nil, err
}
// iterate dirs
for _, dir := range dirs {
// skip anything that is not a directory
if !dir.IsDir() {
continue
}
// now let us read the files within this level
files, err := os.ReadDir(dir.Name())
if err != nil {
return nil, err
}
// visit each file
for _, file := range files {
// if the file is not a ss-table data file, continue
if file.IsDir() || !strings.HasSuffix(file.Name(), dataFileSuffix) {
continue // skip to the next file
}
// get the sequence from the data file name
seq, err := fromDataFileName(file.Name())
if err != nil {
return nil, err
}
// if the file is a ss-table, open it
sst, err := openSSTable(dir.Name(), seq)
if err != nil {
return nil, err
}
// perform prelim check to see if the provided
// key may fall in the range of this table
if ok := sst.keyInRange(key); !ok {
// if the key is not in the range, we can
// skip to the next table straight away
continue
}
// if the key does fall in the range than there
// is a very high chance that it will be found
// within this table. perform a search on the
// ss-table for the provided key and return
e, err := searchSSTable(sst.path, key)
if err != nil {
return nil, err
}
// check and return found entry
if e != nil && !e.hasTombstone() {
return e, nil
}
}
}
return nil, ErrNotFound
}
func searchSSTableOLD(dir string, key []byte) (*Entry, error) {
// read the base dir for this level
dirs, err := os.ReadDir(sstm.baseDir)
if err != nil {
return err
}
// iterate dirs
for _, dir := range dirs {
// skip anything that is not a directory
if !dir.IsDir() {
continue
}
// get level
level, err := dirToLevel(dir.Name())
if err != nil {
return err
}
// add level to levels
if _, ok := sstm.level[level]; !ok {
sstm.level[level] = 0
}
// now let us add the file count within those levels
files, err := os.ReadDir(dir.Name())
if err != nil {
return err
}
// count the files
for _, file := range files {
// if the file is a sst-table data file, increment
if !file.IsDir() && strings.HasSuffix(file.Name(), dataFileSuffix) {
sstm.level[level]++
sstm.sstcount++
}
}
}
return nil
}
*/
func (sst *ssTable) ReadAt(offset int64) (*Entry, error) {
// error check
if sst.fd == nil {
return nil, ErrFileClosed
}
// use offset to read entry
e, err := readEntryAt(sst.fd, offset)
if err != nil {
return nil, err
}
// make sure entry checksum is good
err = checkCRC(e, checksum(append(e.Key, e.Value...)))
if err != nil {
return nil, err
}
// return entry
return e, nil
}
/*
func getLevelFromSize(size int64) int {
switch {
case size > 0<<20 && size < 1<<21: // level-0 (2 MB) max=4
return 0
case size > 1<<22 && size < 1<<23: // level-1 (8 MB) max=4
return 1
case size > 1<<24 && size < 1<<25: // level-2 (32 MB) max=4
return 2
case size > 1<<26 && size < 1<<27: // level-3 (128 MB) max=4
return 3
default:
return 4 // oddballs that will need gc for sure
}
}
*/