-
Notifications
You must be signed in to change notification settings - Fork 1
/
record_buffers.go
94 lines (83 loc) · 3.03 KB
/
record_buffers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package recordbuffer
import (
"github.com/kvanticoss/goutils/iterator"
)
// SortedRecordBuffers creates a simple LSM like buffer for records and uses the ReadWriteResetterFactory
// for byte storage of the records
type SortedRecordBuffers struct {
partitions []*recordBuffer
factory ReadWriteResetterFactory
newer func() iterator.Lesser
recordWriters []func(interface{}) (int, error)
clusters []iterator.Lesser
}
// NewSortedRecordBuffers creates a new SortedRecordBuffer using the underlying ReadWriteResetterFactory
func NewSortedRecordBuffers(factory ReadWriteResetterFactory, newer func() iterator.Lesser) *SortedRecordBuffers {
return &SortedRecordBuffers{
partitions: []*recordBuffer{},
factory: factory,
newer: newer,
}
}
// LoadFromRecordIterator populates the buffer through a record iterator.
func (srb *SortedRecordBuffers) LoadFromRecordIterator(it iterator.RecordIterator) (int, error) {
// Read all the records in "this folder" into the cache
return srb.LoadFromLesserIterator(it.ToLesserIterator())
}
// LoadFromLesserIterator will add a single record to the buffer
func (srb *SortedRecordBuffers) LoadFromLesserIterator(it iterator.LesserIterator) (int, error) {
// Read all the records in "this folder" into the cache
var rec iterator.Lesser
var err error
var bytesWritten int
for rec, err = it(); err == nil; rec, err = it() {
// Ensured to be lesser due since it is produced by newer
if n, err := srb.AddRecord(rec); err != nil {
return bytesWritten, err
} else {
bytesWritten += n
}
}
if err == iterator.ErrIteratorStop {
return bytesWritten, nil
}
return bytesWritten, err
}
// AddRecord will add a single record to the buffer
func (srb *SortedRecordBuffers) AddRecord(record interface{}) (int, error) {
nextVal, ok := record.(iterator.Lesser)
if !ok {
return 0, iterator.ErrNotLesser
}
return srb.AddLesser(nextVal)
}
// AddLesser will add a single record to the buffer
func (srb *SortedRecordBuffers) AddLesser(nextVal iterator.Lesser) (int, error) {
var lastRecordInCluster iterator.Lesser
var index int
for index, lastRecordInCluster = range srb.clusters {
if !nextVal.Less(lastRecordInCluster) {
srb.clusters[index] = nextVal
return srb.recordWriters[index](nextVal)
}
}
// A new cluster index might be needed
srb.clusters = append(srb.clusters, nextVal)
cache := srb.decoratedCacheFactory()
srb.partitions = append(srb.partitions, cache)
srb.recordWriters = append(srb.recordWriters, cache.WriteRecord)
return cache.WriteRecord(nextVal)
}
// GetSortedIterator returns an (sorted) interator for all the records stored in the buffer
func (srb *SortedRecordBuffers) GetSortedIterator() (iterator.LesserIterator, error) {
var sortedIterators []iterator.LesserIterator
for _, cache := range srb.partitions {
sortedIterators = append(sortedIterators, cache.GetLesserIt(srb.newer))
}
return iterator.SortedLesserIterators(sortedIterators)
}
func (srb *SortedRecordBuffers) decoratedCacheFactory() *recordBuffer {
return &recordBuffer{
ReadWriteResetter: srb.factory(),
}
}