forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
1639 lines (1334 loc) · 42.2 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package tsm1
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
"os"
"runtime"
"sort"
"sync"
"sync/atomic"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/file"
"github.com/influxdata/influxdb/tsdb"
)
// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used.
var ErrFileInUse = fmt.Errorf("file still in use")
// nilOffset is the value written to the offsets to indicate that position is deleted. The value is the max
// uint32 which is an invalid position. We don't use 0 as 0 is actually a valid position.
var nilOffset = []byte{255, 255, 255, 255}
// TSMReader is a reader for a TSM file.
type TSMReader struct {
// refs is the count of active references to this reader.
refs int64
refsWG sync.WaitGroup
madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED.
mu sync.RWMutex
// accessor provides access and decoding of blocks for the reader.
accessor blockAccessor
// index is the index of all blocks.
index TSMIndex
// tombstoner ensures tombstoned keys are not available by the index.
tombstoner *Tombstoner
// size is the size of the file on disk.
size int64
// lastModified is the last time this file was modified on disk
lastModified int64
// deleteMu limits concurrent deletes
deleteMu sync.Mutex
}
// TSMIndex represent the index section of a TSM file. The index records all
// blocks, their locations, sizes, min and max times.
type TSMIndex interface {
// Delete removes the given keys from the index.
Delete(keys [][]byte)
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
DeleteRange(keys [][]byte, minTime, maxTime int64)
// ContainsKey returns true if the given key may exist in the index. This func is faster than
// Contains but, may return false positives.
ContainsKey(key []byte) bool
// Contains return true if the given key exists in the index.
Contains(key []byte) bool
// ContainsValue returns true if key and time might exist in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// exist in this file, but not have a point exactly at time t.
ContainsValue(key []byte, timestamp int64) bool
// Entries returns all index entries for a key.
Entries(key []byte) []IndexEntry
// ReadEntries reads the index entries for key into entries.
ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
Entry(key []byte, timestamp int64) *IndexEntry
// Key returns the key in the index at the given position, using entries to avoid allocations.
Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry)
// KeyAt returns the key in the index at the given position.
KeyAt(index int) ([]byte, byte)
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Seek returns the position in the index where key <= value in the index.
Seek(key []byte) int
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
OverlapsTimeRange(min, max int64) bool
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
OverlapsKeyRange(min, max []byte) bool
// Size returns the size of the current index in bytes.
Size() uint32
// TimeRange returns the min and max time across all keys in the file.
TimeRange() (int64, int64)
// TombstoneRange returns ranges of time that are deleted for the given key.
TombstoneRange(key []byte) []TimeRange
// KeyRange returns the min and max keys in the file.
KeyRange() ([]byte, []byte)
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
Type(key []byte) (byte, error)
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
UnmarshalBinary(b []byte) error
// Close closes the index and releases any resources.
Close() error
}
// BlockIterator allows iterating over each block in a TSM file in order. It provides
// raw access to the block bytes without decoding them.
type BlockIterator struct {
r *TSMReader
// i is the current key index
i int
// n is the total number of keys
n int
key []byte
cache []IndexEntry
entries []IndexEntry
err error
typ byte
}
// PeekNext returns the next key to be iterated or an empty string.
func (b *BlockIterator) PeekNext() []byte {
if len(b.entries) > 1 {
return b.key
} else if b.n-b.i > 1 {
key, _ := b.r.KeyAt(b.i + 1)
return key
}
return nil
}
// Next returns true if there are more blocks to iterate through.
func (b *BlockIterator) Next() bool {
if b.err != nil {
return false
}
if b.n-b.i == 0 && len(b.entries) == 0 {
return false
}
if len(b.entries) > 0 {
b.entries = b.entries[1:]
if len(b.entries) > 0 {
return true
}
}
if b.n-b.i > 0 {
b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache)
b.i++
// If there were deletes on the TSMReader, then our index is now off and we
// can't proceed. What we just read may not actually the next block.
if b.n != b.r.KeyCount() {
b.err = fmt.Errorf("delete during iteration")
return false
}
if len(b.entries) > 0 {
return true
}
}
return false
}
// Read reads information about the next block to be iterated.
func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) {
if b.err != nil {
return nil, 0, 0, 0, 0, nil, b.err
}
checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil)
if err != nil {
return nil, 0, 0, 0, 0, nil, err
}
return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err
}
// Err returns any errors encounter during iteration.
func (b *BlockIterator) Err() error {
return b.err
}
type tsmReaderOption func(*TSMReader)
// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
return func(r *TSMReader) {
r.madviseWillNeed = willNeed
}
}
// NewTSMReader returns a new TSMReader from the given file.
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
t := &TSMReader{}
for _, option := range options {
option(t)
}
stat, err := f.Stat()
if err != nil {
return nil, err
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
}
index, err := t.accessor.init()
if err != nil {
return nil, err
}
t.index = index
t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)
if err := t.applyTombstones(); err != nil {
return nil, err
}
return t, nil
}
// WithObserver sets the observer for the TSM reader.
func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) {
t.tombstoner.WithObserver(obs)
}
func (t *TSMReader) applyTombstones() error {
var cur, prev Tombstone
batch := make([][]byte, 0, 4096)
if err := t.tombstoner.Walk(func(ts Tombstone) error {
cur = ts
if len(batch) > 0 {
if prev.Min != cur.Min || prev.Max != cur.Max {
t.index.DeleteRange(batch, prev.Min, prev.Max)
batch = batch[:0]
}
}
// Copy the tombstone key and re-use the buffers to avoid allocations
n := len(batch)
batch = batch[:n+1]
if cap(batch[n]) < len(ts.Key) {
batch[n] = make([]byte, len(ts.Key))
} else {
batch[n] = batch[n][:len(ts.Key)]
}
copy(batch[n], ts.Key)
if len(batch) >= 4096 {
t.index.DeleteRange(batch, prev.Min, prev.Max)
batch = batch[:0]
}
prev = ts
return nil
}); err != nil {
return fmt.Errorf("init: read tombstones: %v", err)
}
if len(batch) > 0 {
t.index.DeleteRange(batch, cur.Min, cur.Max)
}
return nil
}
func (t *TSMReader) Free() error {
t.mu.RLock()
defer t.mu.RUnlock()
return t.accessor.free()
}
// Path returns the path of the file the TSMReader was initialized with.
func (t *TSMReader) Path() string {
t.mu.RLock()
p := t.accessor.path()
t.mu.RUnlock()
return p
}
// Key returns the key and the underlying entry at the numeric index.
func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
return t.index.Key(index, entries)
}
// KeyAt returns the key and key type at position idx in the index.
func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
return t.index.KeyAt(idx)
}
func (t *TSMReader) Seek(key []byte) int {
return t.index.Seek(key)
}
// ReadAt returns the values corresponding to the given index entry.
func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.readBlock(entry, vals)
t.mu.RUnlock()
return v, err
}
// Read returns the values corresponding to the block at the given key and timestamp.
func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.read(key, timestamp)
t.mu.RUnlock()
return v, err
}
// ReadAll returns all values for a key in all blocks.
func (t *TSMReader) ReadAll(key []byte) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.readAll(key)
t.mu.RUnlock()
return v, err
}
func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) {
t.mu.RLock()
n, v, err := t.accessor.readBytes(e, b)
t.mu.RUnlock()
return n, v, err
}
// Type returns the type of values stored at the given key.
func (t *TSMReader) Type(key []byte) (byte, error) {
return t.index.Type(key)
}
// Close closes the TSMReader.
func (t *TSMReader) Close() error {
t.refsWG.Wait()
t.mu.Lock()
defer t.mu.Unlock()
if err := t.accessor.close(); err != nil {
return err
}
return t.index.Close()
}
// Ref records a usage of this TSMReader. If there are active references
// when the reader is closed or removed, the reader will remain open until
// there are no more references.
func (t *TSMReader) Ref() {
atomic.AddInt64(&t.refs, 1)
t.refsWG.Add(1)
}
// Unref removes a usage record of this TSMReader. If the Reader was closed
// by another goroutine while there were active references, the file will
// be closed and remove
func (t *TSMReader) Unref() {
atomic.AddInt64(&t.refs, -1)
t.refsWG.Done()
}
// InUse returns whether the TSMReader currently has any active references.
func (t *TSMReader) InUse() bool {
refs := atomic.LoadInt64(&t.refs)
return refs > 0
}
// Remove removes any underlying files stored on disk for this reader.
func (t *TSMReader) Remove() error {
t.mu.Lock()
defer t.mu.Unlock()
return t.remove()
}
// Rename renames the underlying file to the new path.
func (t *TSMReader) Rename(path string) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.accessor.rename(path)
}
// Remove removes any underlying files stored on disk for this reader.
func (t *TSMReader) remove() error {
path := t.accessor.path()
if t.InUse() {
return ErrFileInUse
}
if path != "" {
err := os.RemoveAll(path)
if err != nil {
return err
}
}
if err := t.tombstoner.Delete(); err != nil {
return err
}
return nil
}
// Contains returns whether the given key is present in the index.
func (t *TSMReader) Contains(key []byte) bool {
return t.index.Contains(key)
}
// ContainsValue returns true if key and time might exists in this file. This function could
// return true even though the actual point does not exist. For example, the key may
// exist in this file, but not have a point exactly at time t.
func (t *TSMReader) ContainsValue(key []byte, ts int64) bool {
return t.index.ContainsValue(key, ts)
}
// DeleteRange removes the given points for keys between minTime and maxTime. The series
// keys passed in must be sorted.
func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
if len(keys) == 0 {
return nil
}
batch := t.BatchDelete()
if err := batch.DeleteRange(keys, minTime, maxTime); err != nil {
batch.Rollback()
return err
}
return batch.Commit()
}
// Delete deletes blocks indicated by keys.
func (t *TSMReader) Delete(keys [][]byte) error {
if err := t.tombstoner.Add(keys); err != nil {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
t.index.Delete(keys)
return nil
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (t *TSMReader) OverlapsTimeRange(min, max int64) bool {
return t.index.OverlapsTimeRange(min, max)
}
// OverlapsKeyRange returns true if the key range of the file intersect min and max.
func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool {
return t.index.OverlapsKeyRange(min, max)
}
// TimeRange returns the min and max time across all keys in the file.
func (t *TSMReader) TimeRange() (int64, int64) {
return t.index.TimeRange()
}
// KeyRange returns the min and max key across all keys in the file.
func (t *TSMReader) KeyRange() ([]byte, []byte) {
return t.index.KeyRange()
}
// KeyCount returns the count of unique keys in the TSMReader.
func (t *TSMReader) KeyCount() int {
return t.index.KeyCount()
}
// Entries returns all index entries for key.
func (t *TSMReader) Entries(key []byte) []IndexEntry {
return t.index.Entries(key)
}
// ReadEntries reads the index entries for key into entries.
func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
return t.index.ReadEntries(key, entries)
}
// IndexSize returns the size of the index in bytes.
func (t *TSMReader) IndexSize() uint32 {
return t.index.Size()
}
// Size returns the size of the underlying file in bytes.
func (t *TSMReader) Size() uint32 {
t.mu.RLock()
size := t.size
t.mu.RUnlock()
return uint32(size)
}
// LastModified returns the last time the underlying file was modified.
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
lm := t.lastModified
for _, ts := range t.tombstoner.TombstoneFiles() {
if ts.LastModified > lm {
lm = ts.LastModified
}
}
t.mu.RUnlock()
return lm
}
// HasTombstones return true if there are any tombstone entries recorded.
func (t *TSMReader) HasTombstones() bool {
t.mu.RLock()
b := t.tombstoner.HasTombstones()
t.mu.RUnlock()
return b
}
// TombstoneFiles returns any tombstone files associated with this TSM file.
func (t *TSMReader) TombstoneFiles() []FileStat {
t.mu.RLock()
fs := t.tombstoner.TombstoneFiles()
t.mu.RUnlock()
return fs
}
// TombstoneRange returns ranges of time that are deleted for the given key.
func (t *TSMReader) TombstoneRange(key []byte) []TimeRange {
t.mu.RLock()
tr := t.index.TombstoneRange(key)
t.mu.RUnlock()
return tr
}
// Stats returns the FileStat for the TSMReader's underlying file.
func (t *TSMReader) Stats() FileStat {
minTime, maxTime := t.index.TimeRange()
minKey, maxKey := t.index.KeyRange()
return FileStat{
Path: t.Path(),
Size: t.Size(),
LastModified: t.LastModified(),
MinTime: minTime,
MaxTime: maxTime,
MinKey: minKey,
MaxKey: maxKey,
HasTombstone: t.tombstoner.HasTombstones(),
}
}
// BlockIterator returns a BlockIterator for the underlying TSM file.
func (t *TSMReader) BlockIterator() *BlockIterator {
return &BlockIterator{
r: t,
n: t.index.KeyCount(),
}
}
type BatchDeleter interface {
DeleteRange(keys [][]byte, min, max int64) error
Commit() error
Rollback() error
}
type batchDelete struct {
r *TSMReader
}
func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
if len(keys) == 0 {
return nil
}
// If the keys can't exist in this TSM file, skip it.
minKey, maxKey := keys[0], keys[len(keys)-1]
if !b.r.index.OverlapsKeyRange(minKey, maxKey) {
return nil
}
// If the timerange can't exist in this TSM file, skip it.
if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
return nil
}
if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
return err
}
return nil
}
func (b *batchDelete) Commit() error {
defer b.r.deleteMu.Unlock()
if err := b.r.tombstoner.Flush(); err != nil {
return err
}
return b.r.applyTombstones()
}
func (b *batchDelete) Rollback() error {
defer b.r.deleteMu.Unlock()
return b.r.tombstoner.Rollback()
}
// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time.
// Callers must either Commit or Rollback the operation.
func (r *TSMReader) BatchDelete() BatchDeleter {
r.deleteMu.Lock()
return &batchDelete{r: r}
}
type BatchDeleters []BatchDeleter
func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error {
errC := make(chan error, len(a))
for _, b := range a {
go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b)
}
var err error
for i := 0; i < len(a); i++ {
dErr := <-errC
if dErr != nil {
err = dErr
}
}
return err
}
func (a BatchDeleters) Commit() error {
errC := make(chan error, len(a))
for _, b := range a {
go func(b BatchDeleter) { errC <- b.Commit() }(b)
}
var err error
for i := 0; i < len(a); i++ {
dErr := <-errC
if dErr != nil {
err = dErr
}
}
return err
}
func (a BatchDeleters) Rollback() error {
errC := make(chan error, len(a))
for _, b := range a {
go func(b BatchDeleter) { errC <- b.Rollback() }(b)
}
var err error
for i := 0; i < len(a); i++ {
dErr := <-errC
if dErr != nil {
err = dErr
}
}
return err
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct {
mu sync.RWMutex
// indirectIndex works a follows. Assuming we have an index structure in memory as
// the diagram below:
//
// ┌────────────────────────────────────────────────────────────────────┐
// │ Index │
// ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘
// │0│ │62│ │145│
// ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐
// │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │
// │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │
// └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘
// We would build an `offsets` slices where each element pointers to the byte location
// for the first key in the index slice.
// ┌────────────────────────────────────────────────────────────────────┐
// │ Offsets │
// ├────┬────┬────┬─────────────────────────────────────────────────────┘
// │ 0 │ 62 │145 │
// └────┴────┴────┘
// Using this offset slice we can find `Key 2` by doing a binary search
// over the offsets slice. Instead of comparing the value in the offsets
// (e.g. `62`), we use that as an index into the underlying index to
// retrieve the key at postion `62` and perform our comparisons with that.
// When we have identified the correct position in the index for a given
// key, we could perform another binary search or a linear scan. This
// should be fast as well since each index entry is 28 bytes and all
// contiguous in memory. The current implementation uses a linear scan since the
// number of block entries is expected to be < 100 per key.
// b is the underlying index byte slice. This could be a copy on the heap or an MMAP
// slice reference
b []byte
// offsets contains the positions in b for each key. It points to the 2 byte length of
// key.
offsets []byte
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
// file
minKey, maxKey []byte
// minTime, maxTime are the minimum and maximum times contained in the file across all
// series.
minTime, maxTime int64
// tombstones contains only the tombstoned keys with subset of time values deleted. An
// entry would exist here if a subset of the points for a key were deleted and the file
// had not be re-compacted to remove the points on disk.
tombstones map[string][]TimeRange
}
// TimeRange holds a min and max timestamp.
type TimeRange struct {
Min, Max int64
}
func (t TimeRange) Overlaps(min, max int64) bool {
return t.Min <= max && t.Max >= min
}
// NewIndirectIndex returns a new indirect index.
func NewIndirectIndex() *indirectIndex {
return &indirectIndex{
tombstones: make(map[string][]TimeRange),
}
}
func (d *indirectIndex) offset(i int) int {
if i < 0 || i+4 > len(d.offsets) {
return -1
}
return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4]))
}
func (d *indirectIndex) Seek(key []byte) int {
d.mu.RLock()
defer d.mu.RUnlock()
return d.searchOffset(key)
}
// searchOffset searches the offsets slice for key and returns the position in
// offsets where key would exist.
func (d *indirectIndex) searchOffset(key []byte) int {
// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice).
i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
// i is the position in offsets we are at so get offset it points to
offset := int32(binary.BigEndian.Uint32(x))
// It's pointing to the start of the key which is a 2 byte length
keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
// See if it matches
return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
})
// See if we might have found the right index
if i < len(d.offsets) {
return int(i / 4)
}
// The key is not in the index. i is the index where it would be inserted so return
// a value outside our offset range.
return int(len(d.offsets)) / 4
}
// search returns the byte position of key in the index. If key is not
// in the index, len(index) is returned.
func (d *indirectIndex) search(key []byte) int {
if !d.ContainsKey(key) {
return len(d.b)
}
// We use a binary search across our indirect offsets (pointers to all the keys
// in the index slice).
// TODO(sgc): this should be inlined to `indirectIndex` as it is only used here
i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
// i is the position in offsets we are at so get offset it points to
offset := int32(binary.BigEndian.Uint32(x))
// It's pointing to the start of the key which is a 2 byte length
keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
// See if it matches
return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
})
// See if we might have found the right index
if i < len(d.offsets) {
ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
_, k := readKey(d.b[ofs:])
// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at postion 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(key, k) {
return len(d.b)
}
return int(ofs)
}
// The key is not in the index. i is the index where it would be inserted so return
// a value outside our offset range.
return len(d.b)
}
// ContainsKey returns true of key may exist in this index.
func (d *indirectIndex) ContainsKey(key []byte) bool {
return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
}
// Entries returns all index entries for a key.
func (d *indirectIndex) Entries(key []byte) []IndexEntry {
return d.ReadEntries(key, nil)
}
func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) {
n, k := readKey(d.b[ofs:])
// Read and return all the entries
ofs += n
var ie indexEntries
if entries != nil {
ie.entries = *entries
}
if _, err := readEntries(d.b[ofs:], &ie); err != nil {
panic(fmt.Sprintf("error reading entries: %v", err))
}
if entries != nil {
*entries = ie.entries
}
return k, ie.entries
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
ofs := d.search(key)
if ofs < len(d.b) {
k, entries := d.readEntriesAt(ofs, entries)
// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at position 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(key, k) {
return nil
}
return entries
}
// The key is not in the index. i is the index where it would be inserted.
return nil
}
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key an timestamp, nil is returned.
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
entries := d.Entries(key)
for _, entry := range entries {
if entry.Contains(timestamp) {
return &entry
}
}
return nil
}
// Key returns the key in the index at the given position.
func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
if idx < 0 || idx*4+4 > len(d.offsets) {
return nil, 0, nil
}
ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
n, key := readKey(d.b[ofs:])
typ := d.b[int(ofs)+n]
var ie indexEntries
if entries != nil {
ie.entries = *entries
}
if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil {
return nil, 0, nil
}
if entries != nil {
*entries = ie.entries
}
return key, typ, ie.entries
}
// KeyAt returns the key in the index at the given position.
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock()
if idx < 0 || idx*4+4 > len(d.offsets) {
d.mu.RUnlock()
return nil, 0
}
ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
n, key := readKey(d.b[ofs:])
ofs = ofs + int32(n)
typ := d.b[ofs]
d.mu.RUnlock()
return key, typ
}
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
n := len(d.offsets) / 4
d.mu.RUnlock()
return n
}
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys [][]byte) {
if len(keys) == 0 {
return
}
if !bytesutil.IsSorted(keys) {
bytesutil.Sort(keys)
}
// Both keys and offsets are sorted. Walk both in order and skip
// any keys that exist in both.
d.mu.Lock()
start := d.searchOffset(keys[0])
for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 {
offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
_, indexKey := readKey(d.b[offset:])
for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
keys = keys[1:]
}
if len(keys) > 0 && bytes.Equal(keys[0], indexKey) {
keys = keys[1:]
copy(d.offsets[i:i+4], nilOffset[:])
}
}
d.offsets = bytesutil.Pack(d.offsets, 4, 255)
d.mu.Unlock()
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
// No keys, nothing to do
if len(keys) == 0 {
return
}
if !bytesutil.IsSorted(keys) {
bytesutil.Sort(keys)
}
// If we're deleting the max time range, just use tombstoning to remove the
// key from the offsets slice
if minTime == math.MinInt64 && maxTime == math.MaxInt64 {