-
Notifications
You must be signed in to change notification settings - Fork 348
/
skip_prefix_iterator.go
155 lines (138 loc) · 4.17 KB
/
skip_prefix_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package committed
import (
"sort"
"strings"
"github.com/treeverse/lakefs/pkg/graveler"
)
type rangeValue struct {
r *Range
vr *graveler.ValueRecord
}
type ImportIterator interface {
IsCurrentRangeBoundedByPrefix() bool
IsCurrentPrefixIncludedInRange() bool
}
type SkipPrefixIterator struct {
currentPrefixIndex int
prefixes []graveler.Prefix
rangeIterator Iterator
currentRangeValue rangeValue
}
func (ipi *SkipPrefixIterator) Value() (*graveler.ValueRecord, *Range) {
return ipi.currentRangeValue.vr, ipi.currentRangeValue.r
}
func (ipi *SkipPrefixIterator) updateValue() (*graveler.ValueRecord, *Range) {
vr, r := ipi.rangeIterator.Value()
ipi.currentRangeValue = rangeValue{
r,
vr,
}
return vr, r
}
func (ipi *SkipPrefixIterator) Err() error {
return ipi.rangeIterator.Err()
}
func (ipi *SkipPrefixIterator) Close() {
ipi.rangeIterator.Close()
}
func (ipi *SkipPrefixIterator) SeekGE(id graveler.Key) {
ipi.rangeIterator.SeekGE(id)
}
func (ipi *SkipPrefixIterator) Next() bool {
if !ipi.rangeIterator.Next() {
return false
}
vr, r := ipi.updateValue()
ipi.updatePrefix()
if vr == nil && r != nil { // head of range
for ipi.IsCurrentRangeBoundedByPrefix() {
if !ipi.rangeIterator.NextRange() {
return false
}
ipi.updateValue()
ipi.updatePrefix()
}
} else {
prefixLen := len(ipi.prefixes)
for vr != nil && ipi.currentPrefixIndex < prefixLen && strings.HasPrefix(vr.Key.String(), string(ipi.prefixes[ipi.currentPrefixIndex])) {
if !ipi.rangeIterator.Next() {
return false
}
vr, _ = ipi.updateValue()
ipi.updatePrefix()
}
}
return true
}
func (ipi *SkipPrefixIterator) NextRange() bool {
if !ipi.rangeIterator.NextRange() {
return false
}
ipi.updateValue()
ipi.updatePrefix()
for ipi.IsCurrentRangeBoundedByPrefix() {
if !ipi.rangeIterator.NextRange() {
return false
}
ipi.updateValue()
ipi.updatePrefix()
}
return true
}
func (ipi *SkipPrefixIterator) updatePrefix() {
if ipi.currentPrefixIndex >= len(ipi.prefixes) {
return
}
currMinKey := string(ipi.currentRangeValue.r.MinKey)
if ipi.currentRangeValue.vr != nil {
currMinKey = string(ipi.currentRangeValue.vr.Key)
}
// If the current prefix is smaller and isn't the prefix of the currentMinKey, get the next prefix.
// By the end of this loop, the examined prefix will either be the prefix of the currentMinKey, or
// lexicographically bigger than it.
for string(ipi.prefixes[ipi.currentPrefixIndex]) < currMinKey &&
!strings.HasPrefix(currMinKey, string(ipi.prefixes[ipi.currentPrefixIndex])) {
p := ipi.currentPrefixIndex + 1
switch {
case p >= len(ipi.prefixes): // No more comparable prefixes
ipi.currentPrefixIndex = p // Block next call for updatePrefix
return
case string(ipi.prefixes[p]) <= string(ipi.currentRangeValue.r.MaxKey):
ipi.currentPrefixIndex = p
default:
return
}
}
}
func (ipi *SkipPrefixIterator) getPrefix() *graveler.Prefix {
if ipi.currentPrefixIndex >= len(ipi.prefixes) {
return nil
}
return &ipi.prefixes[ipi.currentPrefixIndex]
}
// IsCurrentRangeBoundedByPrefix returns true if both the range's max and min keys have the current prefix.
func (ipi *SkipPrefixIterator) IsCurrentRangeBoundedByPrefix() bool {
p := ipi.getPrefix()
if p == nil {
return false
}
r := ipi.currentRangeValue.r
return strings.HasPrefix(string(r.MinKey), string(*p)) && strings.HasPrefix(string(r.MaxKey), string(*p))
}
// IsCurrentPrefixIncludedInRange returns true if the examined prefix is either a prefix of the range's min or max key,
// or if the prefix is between the range's min and max keys.
func (ipi *SkipPrefixIterator) IsCurrentPrefixIncludedInRange() bool {
p := ipi.getPrefix()
if p == nil {
return false
}
r := ipi.currentRangeValue.r
inRange := strings.Compare(string(*p), string(r.MinKey)) >= 0 && strings.Compare(string(*p), string(r.MaxKey)) <= 0
return strings.HasPrefix(string(r.MinKey), string(*p)) || inRange
}
func NewSkipPrefixIterator(prefixes []graveler.Prefix, rangeIterator Iterator) *SkipPrefixIterator {
sort.Slice(prefixes, func(i, j int) bool {
return prefixes[i] < prefixes[j]
})
return &SkipPrefixIterator{prefixes: prefixes, currentPrefixIndex: 0, rangeIterator: rangeIterator}
}